Spark是一個極為優秀的大數據框架,在大數據批處理上基本無人能敵,流處理上也有一席之地,機器學習則是當前正火熱AI人工智能的驅動引擎,在大數據場景下如何發揮AI技術成為優秀的大數據挖掘工程師必備技能。
本文采用的組件版本為:Ubuntu 19.10、Jdk 1.8.0_241、Scala 2.11.12、Hadoop 3.2.1、Spark 2.4.5,老規矩先開啟一系列Hadoop、Spark服務與Spark-shell窗口:
1.協同過濾參數
協作過濾通常用於推薦系統。這些技術旨在填充用戶項關聯矩陣的缺失條目。spark.mllib當前支持基於模型的協作過濾,其中用戶和產品由一小部分潛在因素來描述,這些潛在因素可用於預測缺失條目。spark.mllib使用交替最小二乘(ALS)算法來學習這些潛在因素。spark.mllib中的實現具有以下參數:
numBlocks 是用於並行化計算的塊數(設置為-1以進行自動配置)。Rank 是要使用的功能數量(也稱為潛在因素數量)。Iterations 是要運行的ALS迭代次數。ALS通常會在20次或更短的迭代中收斂到一個合理的解決方案。lambda 在ALS中指定正則化參數。hiddenPrefs 指定是使用顯式反饋ALS變體,還是使用適用於隱式反饋數據的變體。2.交替最小二乘(ALS)
ALS是交替最小二乘(alternating least squares)的簡稱。在機器學習中,ALS特指使用交替最小二乘求解的一個協同推薦算法。它通過觀察到的所有用戶給商品的打分,來推斷每個用戶的喜好並向用戶推薦適合的商品。舉個例子,我們看下面一個8*8的用戶打分矩陣。
這個矩陣的每一行代表一個用戶(u1,u2,…,u8),每一列代表一個商品(v1,v2,…,v8),用戶的打分為1-9分。但例如(u6,v5)需要我們去推測,如果以數獨的方式來解決這個問題,每添加一條規則,就讓整個系統的自由度下降一個量級。當我們滿足所有的規則時,整個系統的自由度就降為1了,也就得出了唯一的結果。
對於上面的打分矩陣,我們需要提出一個限制其自由度的合理假設,使得我們可以通過觀察已有打分來猜測未知打分。
ALS的核心就是這樣一個假設:打分矩陣是近似低秩的。換句話說,就是一個m*n的打分矩陣可以由分解的兩個小矩陣U(m*k)和V(k*n)的乘積來近似,這就是ALS的矩陣分解方法。這樣我們把系統的自由度從O(mn)降到了O((m+n)k)。
3.Spark實現ALS原理
Spark利用交換最小二乘解決矩陣分解問題分兩種情況:數據集是顯式反饋和數據集是隱式反饋。由於隱式反饋算法的原理是在顯示反饋算法原理的基礎上作的修改,所以我們在此只會具體講解數據集為隱式反饋的算法。
從廣義上講,推薦系統基於兩種不同的策略:基於內容的方法和基於協同過濾的方法。Spark中使用協同過濾的方式。協同過濾分析用戶以及用戶相關的產品的相關性,用以識別新的用戶-產品相關性。協同過濾系統需要的唯一信息是用戶過去的行為信息,比如對產品的評價信息。協同過濾是領域無關的,所以它可以方便解決基於內容方法難以解決的許多問題。
推薦系統依賴不同類型的輸入數據,最方便的是高質量的顯式反饋數據(例如用戶評星數據),但是顯式反饋數據不一定總是找得到,因此推薦系統可以從更豐富的隱式反饋信息中推測用戶的偏好。隱式反饋類型包括購買歷史、瀏覽歷史、搜索模式甚至鼠標動作。例如,購買同一個作者許多書的用戶可能喜歡這個作者。
瞭解隱式反饋的特點非常重要,因為這些特質使我們避免了直接調用基於顯式反饋的算法。最主要的特點有如下幾種:
沒有負反饋。即很難推測哪個商品用戶不喜歡。這在顯式反饋算法中並不存在,因為用戶明確告訴了我們哪些他喜歡哪些他不喜歡。隱式反饋是內在的噪音。雖然我們拼命的追蹤用戶行為,但是我們僅僅只是猜測他們的偏好和真實動機。例如,因為這個商品可能作為禮物被購買而用戶並不喜歡它。顯示反饋的數值值表示偏好(preference),隱式回饋的數值值表示信任(confidence)。基於顯示反饋的系統用星星等級讓用戶表達他們的喜好程度。基於隱式反饋的數值值描述的是動作的頻率,例如用戶購買特定商品的次數。一個較大的值並不能表明更多的偏愛。但是這個值是有用的,它描述了在一個特定觀察中的信任度。評價隱式反饋推薦系統需要合適的手段。4.Spark實例
在以下示例中,我們加載評級數據。每行包含一個用戶,一個產品和一個評分。我們使用默認的ALS.train()方法,該方法假定等級是明確的。我們通過測量評分預測的均方誤差來評估推薦模型。
<code>
import
org.apache.spark.mllib.recommendation.ALSimport
org.apache.spark.mllib.recommendation.MatrixFactorizationModelimport
org.apache.spark.mllib.recommendation.Rating val data = sc.textFile("data/mllib/als/test.data"
) val ratings = data.map
(_
.split
(',') match {case
Array
(user, item, rate) =>Rating
(user.toInt, item.toInt, rate.toDouble) }) val rank =10
val numIterations =10
val model =ALS
.train(ratings, rank, numIterations,0.01
) val usersProducts = ratings.map
{case
Rating
(user, product, rate) => (user, product) } val predictions = model.predict(usersProducts).map
{case
Rating
(user, product, rate) => ((user, product), rate) } val ratesAndPreds = ratings.map
{case
Rating
(user, product, rate) => ((user, product), rate) }.join
(predictions) valMSE
= ratesAndPreds.map
{case
((user, product), (r1, r2)) => val err = (r1 - r2) err * err }.mean()println
(s"Mean Squared Error = $MSE"
) model.save(sc,"target/tmp/myCollaborativeFilter"
) val sameModel =MatrixFactorizationModel
.load(sc,"target/tmp/myCollaborativeFilter"
) /<code>5.源碼解析
從代碼中我們知道,訓練模型用到了ALS.scala文件中的train方法,下面我們將詳細介紹train方法的實現。在此之前,我們先了解一下train方法的參數表示的含義。
<code>
def
train(
ratings:
RDD[Rating[ID]],
//訓練數據
rank:
Int
=
10
,
//隱含特徵數
numUserBlocks:
Int
=
10
,
//分區數
numItemBlocks:
Int
=
10
,
maxIter:
Int
=
10
,
//迭代次數
regParam:
Double
=
1.0
,
implicitPrefs:
Boolean
=
false
,
alpha:
Double
=
1.0
,
nonnegative:
Boolean
=
false
,
intermediateRDDStorageLevel:
StorageLevel
=
StorageLevel.MEMORY_AND_DISK,
finalRDDStorageLevel:
StorageLevel
=
StorageLevel.MEMORY_AND_DISK,
checkpointInterval:
Int
=
10
,
seed:
Long
=
0L):
MatrixFactorizationModel
/<code>獲取inblocks和outblocks數據是數據處理的重點。我們知道,通信複雜度是分佈式實現一個算法時要重點考慮的問題,不同的實現可能會對性能產生很大的影響。我們假設最壞的情況:即求解商品需要的所有用戶特徵都需要從其它節點獲得。求解v1需要獲得u1,u2,求解v2需要獲得u1,u2,u3等,在這種假設下,每步迭代所需的交換數據量是O(m*rank),其中m表示所有觀察到的打分集大小,rank表示特徵數量。下面的代碼用來分別獲取用戶和商品的InBlock和OutBlock。
<code>val (userInBlocks, userOutBlocks) =makeBlocks(
"user"
, blockRatings, userPart, itemPart,intermediateRDDStorageLevel) val swappedBlockRatings = blockRatings.map {case
(
(userBlockId, itemBlockId
),RatingBlock
(userIds, itemIds, localRatings
)) => ((itemBlockId, userBlockId), RatingBlock(itemIds, userIds, localRatings)) } val (itemInBlocks, itemOutBlocks) =makeBlocks("item"
, swappedBlockRatings, itemPart, userPart,intermediateRDDStorageLevel)/<code>Spark協同過濾的內容至此結束,有關Spark的基礎文章可參考前文:
想要入門大數據?這篇文章不得不看!Spark源碼分析系列
阿里是怎麼做大數據的?淘寶怎麼能承載雙11?大數據之眸告訴你
Spark分佈式機器學習源碼分析:如何用分佈式集群構建線性模型?
高頻面經總結:最全大數據+AI方向面試100題(附答案詳解)
Spark分佈式機器學習系列:一文帶你理解並實戰樸素貝葉斯!
Spark分佈式機器學習系列:一文帶你理解並實戰決策樹模型!
Spark分佈式機器學習系列:一文帶你理解並實戰集成樹模型!