SimRank
1. 算法介绍和原理
1.1 简介
simrank算法是计算相似度的算法,它的作用和协同过滤相似,我们可以使用这个算法来挖掘商品之间的相似度。
- 图1
1.2 算法原理
simrank是一个迭代算法,算法迭代过程使用如下公式:
- 公式1
S(a, b)——表示(a, b)的相似度
C——表示衰减参数
I(a)——表示结点a的入度数
从公式可以看出,两个结点之间的相似度是从与它们相邻点的相似度累加而获得的。现在设n为结点数,e为一个结点的邻接边数,则每一轮迭代的时间复杂度都为O(n^2 * e^2)。当结点数目达到100万个,边数满足幂律分布的时候,计算量将会相当巨大,因此我们需要一些并发计算工具来实现这个算法。这里我们使用了Spark,原因是Spark十分适合做迭代算法的计算,另外编写Spark作业的代码量相当少。
2. 矩阵相乘
从simrank的公式可以看出,simrank的计算可以拆分成两个矩阵相乘的过程。
- 公式2
S——相似矩阵
L——邻接矩阵
N(L)——对矩阵L进行列规范化
根据上面的公式变形,我们就可以使用并发的矩阵相乘计算方法对simrank的计算并发化。
2.1 k行(列)作为一个分块
- 图2
第一次矩阵相乘的时间复杂度为O(e),空间复杂度为O(n + e),shuffle空间O(n^2 * e) 第二次矩阵相乘的时间复杂度为O(e),空间复杂度为O(n + e),shuffle空间O(n^2 * e)
在Spark计算中,每个分块会串行计算若干组切分,Spark的分块数不能太大,这样会导致Shuffle产生的文件数变多,后面拉取shuffle数据的过程也比较慢。因此这里的单机时间和空间复杂度都要乘上一个k(表示每个分块的大小),变成O(k*e)和O(k*(n+e)),而shuffle的空间复杂度不变。由于分块不能过大,那么k的数值就会变大,从而导致计算过程中,很多机器都会因内存不足而挂掉。
2.2 子矩阵切分
由于这样的拆分方法会导致单机的空间复杂度上升,于是有了这个子矩阵拆分的方法。我们把一个n*n矩阵切分成多个m*m的子矩阵。这样拆分之后,我们的程序又变成单机版的矩阵相乘了,在这样的矩阵相乘中,里面每个元素就的相乘和相加都变成子矩阵的相乘和相加。
- 图3
改造后,时间举证相乘的时间复杂度变为O((n/m)^3 * k * m),空间复杂度O(2*k*m),shuffle空间复杂度为O(m^3)。 这种做法就是用时间换取空间,但是这样做就有利于算法的稳定执行,而不需要担心有机器因内存不足而挂掉。
3. 实现与调优
- checkpoint
由于Spark在计算过程中进行shuffle数据时都是写入机器本身的硬盘,也没有做任何清理。因此在矩阵相乘这种会产生大量shuffle数据的计算中,很有可能出现RDD的数据还没有被使用,机器就挂掉了。因此需要使用checkpoint把计算出来的中间数据存放到HDFS上。
经验总结
1. 我们的开发规范
1.1 开发工具
我们使用Intellij idea作为开发工具,用scala作为开发语言,用maven做项目管理。这样的配搭是我们目前为止摸索出的一个比较舒服的方法。原因是:
- idea对scala和maven的支持相当友好,方便做单元测试和单机测试。
- scala是Spark的开发语言,这样就能完全使用Spark的特性,而不会出现像Java和Python缺少一些函数的支持。
1.2 调优参数注入
由于我们在Yarn上使用Spark,由于生产和安全问题,我们只能用yarn-standalone的模式提交作业,因此我们的调优参数都是使用另外的配置文件注入,而不是直接写在作业代码中。下面是我们常用的调优参数:
- spark.akka.frameSize=50
- action后收集数据时有些算法不可避免的需要collect大量数据
- spark.storage.memoryFraction=0.2
- 这里表示使用内存缓存数据时使用总内存的大小,由于一般cache的数据并不会有太多,所以会预留更多空间给shuffle和计算使用
- spark.default.parallelism=100
- 我们把默认并发度设置成100,这样跟适合我们的数据量。当然这个值也可以在transformation的方法里面指定。
2. 常见问题
我们使用Yarn作为Spark的资源调度器,因此我们也需要遵守Yarn的规则。
2.1 监控
在Yarn上执行Spark的时候,我们可以清楚看到Spark的监控状况,而当任务结束后,Spark启动的监控也会关闭。如果需要查看执行状况就需要进入Yarn的监控页面查看对应作业的日志。另外,Yarn上是允许一个物理机启动多个container,那么从Spark监控直接查看Container的日志将会十分困难,这时候就需要对源码做一定的修改来优化日志查看。
2.2 Yarn资源使用
我们一般都会使用下面几个参数:
- --worker-memory
- --worker-cores
- --num-workers
- --master-memory
这里有一个需要注意的地方,就是一个worker中,是多个core共享memory的。如果要使得作业能稳定执行,worker-cores这个参数应该设置得约小约好,然后配合着申请适当的内存,提高num-workers的数量就能提搞并行度。
评论
发表评论