Spark是一个极为优秀的大数据框架,在大数据批处理上基本无人能敌,流处理上也有一席之地,机器学习则是当前正火热AI人工智能的驱动引擎,在大数据场景下如何发挥AI技术成为优秀的大数据挖掘工程师必备技能。
本文采用的组件版本为:Ubuntu 19.10、Jdk 1.8.0_241、Scala 2.11.12、Hadoop 3.2.1、Spark 2.4.5,老规矩先开启一系列Hadoop、Spark服务与Spark-shell窗口:
1.协同过滤参数
协作过滤通常用于推荐系统。这些技术旨在填充用户项关联矩阵的缺失条目。spark.mllib当前支持基于模型的协作过滤,其中用户和产品由一小部分潜在因素来描述,这些潜在因素可用于预测缺失条目。spark.mllib使用交替最小二乘(ALS)算法来学习这些潜在因素。spark.mllib中的实现具有以下参数:
- numBlocks 是用于并行化计算的块数(设置为-1以进行自动配置)。
- Rank 是要使用的功能数量(也称为潜在因素数量)。
- Iterations 是要运行的ALS迭代次数。ALS通常会在20次或更短的迭代中收敛到一个合理的解决方案。
- lambda 在ALS中指定正则化参数。
- hiddenPrefs 指定是使用显式反馈ALS变体,还是使用适用于隐式反馈数据的变体。
- Alpha 是适用于ALS的隐式反馈变量的参数,用于控制偏好观察中的基线置信度。
2.交替最小二乘(ALS)
ALS是交替最小二乘(alternating least squares)的简称。在机器学习中,ALS特指使用交替最小二乘求解的一个协同推荐算法。它通过观察到的所有用户给商品的打分,来推断每个用户的喜好并向用户推荐适合的商品。举个例子,我们看下面一个8*8的用户打分矩阵。
这个矩阵的每一行代表一个用户(u1,u2,…,u8),每一列代表一个商品(v1,v2,…,v8),用户的打分为1-9分。但例如(u6,v5)需要我们去推测,如果以数独的方式来解决这个问题,每添加一条规则,就让整个系统的自由度下降一个量级。当我们满足所有的规则时,整个系统的自由度就降为1了,也就得出了唯一的结果。
对于上面的打分矩阵,我们需要提出一个限制其自由度的合理假设,使得我们可以通过观察已有打分来猜测未知打分。
ALS的核心就是这样一个假设:打分矩阵是近似低秩的。换句话说,就是一个m*n的打分矩阵可以由分解的两个小矩阵U(m*k)和V(k*n)的乘积来近似,这就是ALS的矩阵分解方法。这样我们把系统的自由度从O(mn)降到了O((m+n)k)。
3.Spark实现ALS原理
Spark利用交换最小二乘解决矩阵分解问题分两种情况:数据集是显式反馈和数据集是隐式反馈。由于隐式反馈算法的原理是在显示反馈算法原理的基础上作的修改,所以我们在此只会具体讲解数据集为隐式反馈的算法。
从广义上讲,推荐系统基于两种不同的策略:基于内容的方法和基于协同过滤的方法。Spark中使用协同过滤的方式。协同过滤分析用户以及用户相关的产品的相关性,用以识别新的用户-产品相关性。协同过滤系统需要的唯一信息是用户过去的行为信息,比如对产品的评价信息。协同过滤是领域无关的,所以它可以方便解决基于内容方法难以解决的许多问题。
推荐系统依赖不同类型的输入数据,最方便的是高质量的显式反馈数据(例如用户评星数据),但是显式反馈数据不一定总是找得到,因此推荐系统可以从更丰富的隐式反馈信息中推测用户的偏好。隐式反馈类型包括购买历史、浏览历史、搜索模式甚至鼠标动作。例如,购买同一个作者许多书的用户可能喜欢这个作者。
了解隐式反馈的特点非常重要,因为这些特质使我们避免了直接调用基于显式反馈的算法。最主要的特点有如下几种:
- 没有负反馈。即很难推测哪个商品用户不喜欢。这在显式反馈算法中并不存在,因为用户明确告诉了我们哪些他喜欢哪些他不喜欢。
- 隐式反馈是内在的噪音。虽然我们拼命的追踪用户行为,但是我们仅仅只是猜测他们的偏好和真实动机。例如,因为这个商品可能作为礼物被购买而用户并不喜欢它。
- 显示反馈的数值值表示偏好(preference),隐式回馈的数值值表示信任(confidence)。基于显示反馈的系统用星星等级让用户表达他们的喜好程度。基于隐式反馈的数值值描述的是动作的频率,例如用户购买特定商品的次数。一个较大的值并不能表明更多的偏爱。但是这个值是有用的,它描述了在一个特定观察中的信任度。
- 评价隐式反馈推荐系统需要合适的手段。
4.Spark实例
在以下示例中,我们加载评级数据。每行包含一个用户,一个产品和一个评分。我们使用默认的ALS.train()方法,该方法假定等级是明确的。我们通过测量评分预测的均方误差来评估推荐模型。
<code>import
org.apache.spark.mllib.recommendation.ALSimport
org.apache.spark.mllib.recommendation.MatrixFactorizationModelimport
org.apache.spark.mllib.recommendation.Rating val data = sc.textFile("data/mllib/als/test.data"
) val ratings = data.map
(_
.split
(',') match {case
Array
(user, item, rate) =>Rating
(user.toInt, item.toInt, rate.toDouble) }) val rank =10
val numIterations =10
val model =ALS
.train(ratings, rank, numIterations,0.01
) val usersProducts = ratings.map
{case
Rating
(user, product, rate) => (user, product) } val predictions = model.predict(usersProducts).map
{case
Rating
(user, product, rate) => ((user, product), rate) } val ratesAndPreds = ratings.map
{case
Rating
(user, product, rate) => ((user, product), rate) }.join
(predictions) valMSE
= ratesAndPreds.map
{case
((user, product), (r1, r2)) => val err = (r1 - r2) err * err }.mean()println
(s"Mean Squared Error = $MSE"
) model.save(sc,"target/tmp/myCollaborativeFilter"
) val sameModel =MatrixFactorizationModel
.load(sc,"target/tmp/myCollaborativeFilter"
) /<code>
5.源码解析
从代码中我们知道,训练模型用到了ALS.scala文件中的train方法,下面我们将详细介绍train方法的实现。在此之前,我们先了解一下train方法的参数表示的含义。
<code>def
train(
ratings:
RDD[Rating[ID]],
//训练数据
rank:
Int
=
10
,
//隐含特征数
numUserBlocks:
Int
=
10
,
//分区数
numItemBlocks:
Int
=
10
,
maxIter:
Int
=
10
,
//迭代次数
regParam:
Double
=
1.0
,
implicitPrefs:
Boolean
=
false
,
alpha:
Double
=
1.0
,
nonnegative:
Boolean
=
false
,
intermediateRDDStorageLevel:
StorageLevel
=
StorageLevel.MEMORY_AND_DISK,
finalRDDStorageLevel:
StorageLevel
=
StorageLevel.MEMORY_AND_DISK,
checkpointInterval:
Int
=
10
,
seed:
Long
=
0L):
MatrixFactorizationModel
/<code>
获取inblocks和outblocks数据是数据处理的重点。我们知道,通信复杂度是分布式实现一个算法时要重点考虑的问题,不同的实现可能会对性能产生很大的影响。我们假设最坏的情况:即求解商品需要的所有用户特征都需要从其它节点获得。求解v1需要获得u1,u2,求解v2需要获得u1,u2,u3等,在这种假设下,每步迭代所需的交换数据量是O(m*rank),其中m表示所有观察到的打分集大小,rank表示特征数量。下面的代码用来分别获取用户和商品的InBlock和OutBlock。
<code>val (userInBlocks, userOutBlocks) =makeBlocks("user"
, blockRatings, userPart, itemPart,intermediateRDDStorageLevel) val swappedBlockRatings = blockRatings.map {case
(
(userBlockId, itemBlockId
),RatingBlock
(userIds, itemIds, localRatings
)) => ((itemBlockId, userBlockId), RatingBlock(itemIds, userIds, localRatings)) } val (itemInBlocks, itemOutBlocks) =makeBlocks("item"
, swappedBlockRatings, itemPart, userPart,intermediateRDDStorageLevel)/<code>
Spark协同过滤的内容至此结束,有关Spark的基础文章可参考前文:
阿里是怎么做大数据的?淘宝怎么能承载双11?大数据之眸告诉你
Spark分布式机器学习源码分析:如何用分布式集群构建线性模型?
高频面经总结:最全大数据+AI方向面试100题(附答案详解)
Spark分布式机器学习系列:一文带你理解并实战朴素贝叶斯!