一文帶你理解並實戰協同過濾!Spark分佈式機器學習系列

​ Spark是一個極為優秀的大數據框架,在大數據批處理上基本無人能敵,流處理上也有一席之地,機器學習則是當前正火熱AI人工智能的驅動引擎,在大數據場景下如何發揮AI技術成為優秀的大數據挖掘工程師必備技能。

本文采用的組件版本為:Ubuntu 19.10、Jdk 1.8.0_241、Scala 2.11.12、Hadoop 3.2.1、Spark 2.4.5,老規矩先開啟一系列Hadoop、Spark服務與Spark-shell窗口:

一文帶你理解並實戰協同過濾!Spark分佈式機器學習系列

1.協同過濾參數

協作過濾通常用於推薦系統。這些技術旨在填充用戶項關聯矩陣的缺失條目。spark.mllib當前支持基於模型的協作過濾,其中用戶和產品由一小部分潛在因素來描述,這些潛在因素可用於預測缺失條目。spark.mllib使用交替最小二乘(ALS)算法來學習這些潛在因素。spark.mllib中的實現具有以下參數:

  • numBlocks 是用於並行化計算的塊數(設置為-1以進行自動配置)。
  • Rank 是要使用的功能數量(也稱為潛在因素數量)。
  • Iterations 是要運行的ALS迭代次數。ALS通常會在20次或更短的迭代中收斂到一個合理的解決方案。
  • lambda 在ALS中指定正則化參數。
  • hiddenPrefs 指定是使用顯式反饋ALS變體,還是使用適用於隱式反饋數據的變體。
  • Alpha 是適用於ALS的隱式反饋變量的參數,用於控制偏好觀察中的基線置信度。
一文帶你理解並實戰協同過濾!Spark分佈式機器學習系列

2.交替最小二乘(ALS)

ALS是交替最小二乘(alternating least squares)的簡稱。在機器學習中,ALS特指使用交替最小二乘求解的一個協同推薦算法。它通過觀察到的所有用戶給商品的打分,來推斷每個用戶的喜好並向用戶推薦適合的商品。舉個例子,我們看下面一個8*8的用戶打分矩陣。

一文帶你理解並實戰協同過濾!Spark分佈式機器學習系列

這個矩陣的每一行代表一個用戶(u1,u2,…,u8),每一列代表一個商品(v1,v2,…,v8),用戶的打分為1-9分。但例如(u6,v5)需要我們去推測,如果以數獨的方式來解決這個問題,每添加一條規則,就讓整個系統的自由度下降一個量級。當我們滿足所有的規則時,整個系統的自由度就降為1了,也就得出了唯一的結果。

對於上面的打分矩陣,我們需要提出一個限制其自由度的合理假設,使得我們可以通過觀察已有打分來猜測未知打分。

ALS的核心就是這樣一個假設:打分矩陣是近似低秩的。換句話說,就是一個m*n的打分矩陣可以由分解的兩個小矩陣U(m*k)和V(k*n)的乘積來近似,這就是ALS的矩陣分解方法。這樣我們把系統的自由度從O(mn)降到了O((m+n)k)。

3.Spark實現ALS原理

Spark利用交換最小二乘解決矩陣分解問題分兩種情況:數據集是顯式反饋和數據集是隱式反饋。由於隱式反饋算法的原理是在顯示反饋算法原理的基礎上作的修改,所以我們在此只會具體講解數據集為隱式反饋的算法。

從廣義上講,推薦系統基於兩種不同的策略:基於內容的方法和基於協同過濾的方法。Spark中使用協同過濾的方式。協同過濾分析用戶以及用戶相關的產品的相關性,用以識別新的用戶-產品相關性。協同過濾系統需要的唯一信息是用戶過去的行為信息,比如對產品的評價信息。協同過濾是領域無關的,所以它可以方便解決基於內容方法難以解決的許多問題。

推薦系統依賴不同類型的輸入數據,最方便的是高質量的顯式反饋數據(例如用戶評星數據),但是顯式反饋數據不一定總是找得到,因此推薦系統可以從更豐富的隱式反饋信息中推測用戶的偏好。隱式反饋類型包括購買歷史、瀏覽歷史、搜索模式甚至鼠標動作。例如,購買同一個作者許多書的用戶可能喜歡這個作者。

瞭解隱式反饋的特點非常重要,因為這些特質使我們避免了直接調用基於顯式反饋的算法。最主要的特點有如下幾種:

  • 沒有負反饋。即很難推測哪個商品用戶不喜歡。這在顯式反饋算法中並不存在,因為用戶明確告訴了我們哪些他喜歡哪些他不喜歡。
  • 隱式反饋是內在的噪音。雖然我們拼命的追蹤用戶行為,但是我們僅僅只是猜測他們的偏好和真實動機。例如,因為這個商品可能作為禮物被購買而用戶並不喜歡它。
  • 顯示反饋的數值值表示偏好(preference),隱式回饋的數值值表示信任(confidence)。基於顯示反饋的系統用星星等級讓用戶表達他們的喜好程度。基於隱式反饋的數值值描述的是動作的頻率,例如用戶購買特定商品的次數。一個較大的值並不能表明更多的偏愛。但是這個值是有用的,它描述了在一個特定觀察中的信任度。
  • 評價隱式反饋推薦系統需要合適的手段。
一文帶你理解並實戰協同過濾!Spark分佈式機器學習系列

4.Spark實例

在以下示例中,我們加載評級數據。每行包含一個用戶,一個產品和一個評分。我們使用默認的ALS.train()方法,該方法假定等級是明確的。我們通過測量評分預測的均方誤差來評估推薦模型。

<code>

import

org.apache.spark.mllib.recommendation.ALS

import

org.apache.spark.mllib.recommendation.MatrixFactorizationModel

import

org.apache.spark.mllib.recommendation.Rating val data = sc.textFile(

"data/mllib/als/test.data"

) val ratings = data.

map

(

_

.

split

(',') match {

case

Array

(user, item, rate) =>

Rating

(user.toInt, item.toInt, rate.toDouble) }) val rank =

10

val numIterations =

10

val model =

ALS

.train(ratings, rank, numIterations,

0.01

) val usersProducts = ratings.

map

{

case

Rating

(user, product, rate) => (user, product) } val predictions = model.predict(usersProducts).

map

{

case

Rating

(user, product, rate) => ((user, product), rate) } val ratesAndPreds = ratings.

map

{

case

Rating

(user, product, rate) => ((user, product), rate) }.

join

(predictions) val

MSE

= ratesAndPreds.

map

{

case

((user, product), (r1, r2)) => val err = (r1 - r2) err * err }.mean()

println

(s

"Mean Squared Error = $MSE"

) model.save(sc,

"target/tmp/myCollaborativeFilter"

) val sameModel =

MatrixFactorizationModel

.load(sc,

"target/tmp/myCollaborativeFilter"

) /<code>
一文帶你理解並實戰協同過濾!Spark分佈式機器學習系列

5.源碼解析

從代碼中我們知道,訓練模型用到了ALS.scala文件中的train方法,下面我們將詳細介紹train方法的實現。在此之前,我們先了解一下train方法的參數表示的含義。

<code>

def

train(

ratings:

RDD[Rating[ID]],

//訓練數據

rank:

Int

=

10

,

//隱含特徵數

numUserBlocks:

Int

=

10

,

//分區數

numItemBlocks:

Int

=

10

,

maxIter:

Int

=

10

,

//迭代次數

regParam:

Double

=

1.0

,

implicitPrefs:

Boolean

=

false

,

alpha:

Double

=

1.0

,

nonnegative:

Boolean

=

false

,

intermediateRDDStorageLevel:

StorageLevel

=

StorageLevel.MEMORY_AND_DISK,

finalRDDStorageLevel:

StorageLevel

=

StorageLevel.MEMORY_AND_DISK,

checkpointInterval:

Int

=

10

,

seed:

Long

=

0L):

MatrixFactorizationModel

/<code>

獲取inblocks和outblocks數據是數據處理的重點。我們知道,通信複雜度是分佈式實現一個算法時要重點考慮的問題,不同的實現可能會對性能產生很大的影響。我們假設最壞的情況:即求解商品需要的所有用戶特徵都需要從其它節點獲得。求解v1需要獲得u1,u2,求解v2需要獲得u1,u2,u3等,在這種假設下,每步迭代所需的交換數據量是O(m*rank),其中m表示所有觀察到的打分集大小,rank表示特徵數量。下面的代碼用來分別獲取用戶和商品的InBlock和OutBlock。

<code>val (userInBlocks, userOutBlocks) =makeBlocks(

"user"

, blockRatings, userPart, itemPart,intermediateRDDStorageLevel) val swappedBlockRatings = blockRatings.map {

case

(

(userBlockId, itemBlockId

),

RatingBlock

(

userIds, itemIds, localRatings

)) =>

((itemBlockId, userBlockId), RatingBlock(itemIds, userIds, localRatings)) } val (itemInBlocks, itemOutBlocks) =makeBlocks(

"item"

, swappedBlockRatings, itemPart, userPart,intermediateRDDStorageLevel)/<code>

Spark協同過濾的內容至此結束,有關Spark的基礎文章可參考前文:

想要入門大數據?這篇文章不得不看!Spark源碼分析系列

阿里是怎麼做大數據的?淘寶怎麼能承載雙11?大數據之眸告訴你

Spark分佈式機器學習源碼分析:如何用分佈式集群構建線性模型?

高頻面經總結:最全大數據+AI方向面試100題(附答案詳解)

Spark分佈式機器學習系列:一文帶你理解並實戰樸素貝葉斯!

Spark分佈式機器學習系列:一文帶你理解並實戰決策樹模型!

Spark分佈式機器學習系列:一文帶你理解並實戰集成樹模型!


分享到:


相關文章: