一文带你理解并实战协同过滤!Spark分布式机器学习系列

​ Spark是一个极为优秀的大数据框架,在大数据批处理上基本无人能敌,流处理上也有一席之地,机器学习则是当前正火热AI人工智能的驱动引擎,在大数据场景下如何发挥AI技术成为优秀的大数据挖掘工程师必备技能。

本文采用的组件版本为:Ubuntu 19.10、Jdk 1.8.0_241、Scala 2.11.12、Hadoop 3.2.1、Spark 2.4.5,老规矩先开启一系列Hadoop、Spark服务与Spark-shell窗口:

一文带你理解并实战协同过滤!Spark分布式机器学习系列

1.协同过滤参数

协作过滤通常用于推荐系统。这些技术旨在填充用户项关联矩阵的缺失条目。spark.mllib当前支持基于模型的协作过滤,其中用户和产品由一小部分潜在因素来描述,这些潜在因素可用于预测缺失条目。spark.mllib使用交替最小二乘(ALS)算法来学习这些潜在因素。spark.mllib中的实现具有以下参数:

  • numBlocks 是用于并行化计算的块数(设置为-1以进行自动配置)。
  • Rank 是要使用的功能数量(也称为潜在因素数量)。
  • Iterations 是要运行的ALS迭代次数。ALS通常会在20次或更短的迭代中收敛到一个合理的解决方案。
  • lambda 在ALS中指定正则化参数。
  • hiddenPrefs 指定是使用显式反馈ALS变体,还是使用适用于隐式反馈数据的变体。
  • Alpha 是适用于ALS的隐式反馈变量的参数,用于控制偏好观察中的基线置信度。
一文带你理解并实战协同过滤!Spark分布式机器学习系列

2.交替最小二乘(ALS)

ALS是交替最小二乘(alternating least squares)的简称。在机器学习中,ALS特指使用交替最小二乘求解的一个协同推荐算法。它通过观察到的所有用户给商品的打分,来推断每个用户的喜好并向用户推荐适合的商品。举个例子,我们看下面一个8*8的用户打分矩阵。

一文带你理解并实战协同过滤!Spark分布式机器学习系列

这个矩阵的每一行代表一个用户(u1,u2,…,u8),每一列代表一个商品(v1,v2,…,v8),用户的打分为1-9分。但例如(u6,v5)需要我们去推测,如果以数独的方式来解决这个问题,每添加一条规则,就让整个系统的自由度下降一个量级。当我们满足所有的规则时,整个系统的自由度就降为1了,也就得出了唯一的结果。

对于上面的打分矩阵,我们需要提出一个限制其自由度的合理假设,使得我们可以通过观察已有打分来猜测未知打分。

ALS的核心就是这样一个假设:打分矩阵是近似低秩的。换句话说,就是一个m*n的打分矩阵可以由分解的两个小矩阵U(m*k)和V(k*n)的乘积来近似,这就是ALS的矩阵分解方法。这样我们把系统的自由度从O(mn)降到了O((m+n)k)。

3.Spark实现ALS原理

Spark利用交换最小二乘解决矩阵分解问题分两种情况:数据集是显式反馈和数据集是隐式反馈。由于隐式反馈算法的原理是在显示反馈算法原理的基础上作的修改,所以我们在此只会具体讲解数据集为隐式反馈的算法。

从广义上讲,推荐系统基于两种不同的策略:基于内容的方法和基于协同过滤的方法。Spark中使用协同过滤的方式。协同过滤分析用户以及用户相关的产品的相关性,用以识别新的用户-产品相关性。协同过滤系统需要的唯一信息是用户过去的行为信息,比如对产品的评价信息。协同过滤是领域无关的,所以它可以方便解决基于内容方法难以解决的许多问题。

推荐系统依赖不同类型的输入数据,最方便的是高质量的显式反馈数据(例如用户评星数据),但是显式反馈数据不一定总是找得到,因此推荐系统可以从更丰富的隐式反馈信息中推测用户的偏好。隐式反馈类型包括购买历史、浏览历史、搜索模式甚至鼠标动作。例如,购买同一个作者许多书的用户可能喜欢这个作者。

了解隐式反馈的特点非常重要,因为这些特质使我们避免了直接调用基于显式反馈的算法。最主要的特点有如下几种:

  • 没有负反馈。即很难推测哪个商品用户不喜欢。这在显式反馈算法中并不存在,因为用户明确告诉了我们哪些他喜欢哪些他不喜欢。
  • 隐式反馈是内在的噪音。虽然我们拼命的追踪用户行为,但是我们仅仅只是猜测他们的偏好和真实动机。例如,因为这个商品可能作为礼物被购买而用户并不喜欢它。
  • 显示反馈的数值值表示偏好(preference),隐式回馈的数值值表示信任(confidence)。基于显示反馈的系统用星星等级让用户表达他们的喜好程度。基于隐式反馈的数值值描述的是动作的频率,例如用户购买特定商品的次数。一个较大的值并不能表明更多的偏爱。但是这个值是有用的,它描述了在一个特定观察中的信任度。
  • 评价隐式反馈推荐系统需要合适的手段。
一文带你理解并实战协同过滤!Spark分布式机器学习系列

4.Spark实例

在以下示例中,我们加载评级数据。每行包含一个用户,一个产品和一个评分。我们使用默认的ALS.train()方法,该方法假定等级是明确的。我们通过测量评分预测的均方误差来评估推荐模型。

<code>

import

org.apache.spark.mllib.recommendation.ALS

import

org.apache.spark.mllib.recommendation.MatrixFactorizationModel

import

org.apache.spark.mllib.recommendation.Rating val data = sc.textFile(

"data/mllib/als/test.data"

) val ratings = data.

map

(

_

.

split

(',') match {

case

Array

(user, item, rate) =>

Rating

(user.toInt, item.toInt, rate.toDouble) }) val rank =

10

val numIterations =

10

val model =

ALS

.train(ratings, rank, numIterations,

0.01

) val usersProducts = ratings.

map

{

case

Rating

(user, product, rate) => (user, product) } val predictions = model.predict(usersProducts).

map

{

case

Rating

(user, product, rate) => ((user, product), rate) } val ratesAndPreds = ratings.

map

{

case

Rating

(user, product, rate) => ((user, product), rate) }.

join

(predictions) val

MSE

= ratesAndPreds.

map

{

case

((user, product), (r1, r2)) => val err = (r1 - r2) err * err }.mean()

println

(s

"Mean Squared Error = $MSE"

) model.save(sc,

"target/tmp/myCollaborativeFilter"

) val sameModel =

MatrixFactorizationModel

.load(sc,

"target/tmp/myCollaborativeFilter"

) /<code>
一文带你理解并实战协同过滤!Spark分布式机器学习系列

5.源码解析

从代码中我们知道,训练模型用到了ALS.scala文件中的train方法,下面我们将详细介绍train方法的实现。在此之前,我们先了解一下train方法的参数表示的含义。

<code>

def

train(

ratings:

RDD[Rating[ID]],

//训练数据

rank:

Int

=

10

,

//隐含特征数

numUserBlocks:

Int

=

10

,

//分区数

numItemBlocks:

Int

=

10

,

maxIter:

Int

=

10

,

//迭代次数

regParam:

Double

=

1.0

,

implicitPrefs:

Boolean

=

false

,

alpha:

Double

=

1.0

,

nonnegative:

Boolean

=

false

,

intermediateRDDStorageLevel:

StorageLevel

=

StorageLevel.MEMORY_AND_DISK,

finalRDDStorageLevel:

StorageLevel

=

StorageLevel.MEMORY_AND_DISK,

checkpointInterval:

Int

=

10

,

seed:

Long

=

0L):

MatrixFactorizationModel

/<code>

获取inblocks和outblocks数据是数据处理的重点。我们知道,通信复杂度是分布式实现一个算法时要重点考虑的问题,不同的实现可能会对性能产生很大的影响。我们假设最坏的情况:即求解商品需要的所有用户特征都需要从其它节点获得。求解v1需要获得u1,u2,求解v2需要获得u1,u2,u3等,在这种假设下,每步迭代所需的交换数据量是O(m*rank),其中m表示所有观察到的打分集大小,rank表示特征数量。下面的代码用来分别获取用户和商品的InBlock和OutBlock。

<code>val (userInBlocks, userOutBlocks) =makeBlocks(

"user"

, blockRatings, userPart, itemPart,intermediateRDDStorageLevel) val swappedBlockRatings = blockRatings.map {

case

(

(userBlockId, itemBlockId

),

RatingBlock

(

userIds, itemIds, localRatings

)) =>

((itemBlockId, userBlockId), RatingBlock(itemIds, userIds, localRatings)) } val (itemInBlocks, itemOutBlocks) =makeBlocks(

"item"

, swappedBlockRatings, itemPart, userPart,intermediateRDDStorageLevel)/<code>

Spark协同过滤的内容至此结束,有关Spark的基础文章可参考前文:

想要入门大数据?这篇文章不得不看!Spark源码分析系列

阿里是怎么做大数据的?淘宝怎么能承载双11?大数据之眸告诉你

Spark分布式机器学习源码分析:如何用分布式集群构建线性模型?

高频面经总结:最全大数据+AI方向面试100题(附答案详解)

Spark分布式机器学习系列:一文带你理解并实战朴素贝叶斯!

Spark分布式机器学习系列:一文带你理解并实战决策树模型!

Spark分布式机器学习系列:一文带你理解并实战集成树模型!


分享到:


相關文章: