跳至主要内容

基于Spark的机器学习算法开发和调优

SimRank
1. 算法介绍和原理
1.1 简介
simrank算法是计算相似度的算法,它的作用和协同过滤相似,我们可以使用这个算法来挖掘商品之间的相似度。
  • 图1
1.2 算法原理
simrank是一个迭代算法,算法迭代过程使用如下公式:
  • 公式1
S(a, b)——表示(a, b)的相似度
C——表示衰减参数
I(a)——表示结点a的入度数
从公式可以看出,两个结点之间的相似度是从与它们相邻点的相似度累加而获得的。现在设n为结点数,e为一个结点的邻接边数,则每一轮迭代的时间复杂度都为O(n^2 * e^2)。当结点数目达到100万个,边数满足幂律分布的时候,计算量将会相当巨大,因此我们需要一些并发计算工具来实现这个算法。这里我们使用了Spark,原因是Spark十分适合做迭代算法的计算,另外编写Spark作业的代码量相当少。
2. 矩阵相乘
从simrank的公式可以看出,simrank的计算可以拆分成两个矩阵相乘的过程。
  • 公式2
S——相似矩阵
L——邻接矩阵
N(L)——对矩阵L进行列规范化
根据上面的公式变形,我们就可以使用并发的矩阵相乘计算方法对simrank的计算并发化。
2.1 k行(列)作为一个分块
  • 图2
第一次矩阵相乘的时间复杂度为O(e),空间复杂度为O(n + e),shuffle空间O(n^2 * e) 第二次矩阵相乘的时间复杂度为O(e),空间复杂度为O(n + e),shuffle空间O(n^2 * e)
在Spark计算中,每个分块会串行计算若干组切分,Spark的分块数不能太大,这样会导致Shuffle产生的文件数变多,后面拉取shuffle数据的过程也比较慢。因此这里的单机时间和空间复杂度都要乘上一个k(表示每个分块的大小),变成O(k*e)和O(k*(n+e)),而shuffle的空间复杂度不变。由于分块不能过大,那么k的数值就会变大,从而导致计算过程中,很多机器都会因内存不足而挂掉。
2.2 子矩阵切分
由于这样的拆分方法会导致单机的空间复杂度上升,于是有了这个子矩阵拆分的方法。我们把一个n*n矩阵切分成多个m*m的子矩阵。这样拆分之后,我们的程序又变成单机版的矩阵相乘了,在这样的矩阵相乘中,里面每个元素就的相乘和相加都变成子矩阵的相乘和相加。
  • 图3
改造后,时间举证相乘的时间复杂度变为O((n/m)^3 * k * m),空间复杂度O(2*k*m),shuffle空间复杂度为O(m^3)。 这种做法就是用时间换取空间,但是这样做就有利于算法的稳定执行,而不需要担心有机器因内存不足而挂掉。
3. 实现与调优
  • checkpoint
由于Spark在计算过程中进行shuffle数据时都是写入机器本身的硬盘,也没有做任何清理。因此在矩阵相乘这种会产生大量shuffle数据的计算中,很有可能出现RDD的数据还没有被使用,机器就挂掉了。因此需要使用checkpoint把计算出来的中间数据存放到HDFS上。
经验总结
1. 我们的开发规范
1.1 开发工具
我们使用Intellij idea作为开发工具,用scala作为开发语言,用maven做项目管理。这样的配搭是我们目前为止摸索出的一个比较舒服的方法。原因是:
  1. idea对scala和maven的支持相当友好,方便做单元测试和单机测试。
  2. scala是Spark的开发语言,这样就能完全使用Spark的特性,而不会出现像Java和Python缺少一些函数的支持。
1.2 调优参数注入
由于我们在Yarn上使用Spark,由于生产和安全问题,我们只能用yarn-standalone的模式提交作业,因此我们的调优参数都是使用另外的配置文件注入,而不是直接写在作业代码中。下面是我们常用的调优参数:
  1. spark.akka.frameSize=50 
  2. action后收集数据时有些算法不可避免的需要collect大量数据
  3. spark.storage.memoryFraction=0.2
  4. 这里表示使用内存缓存数据时使用总内存的大小,由于一般cache的数据并不会有太多,所以会预留更多空间给shuffle和计算使用
  5. spark.default.parallelism=100 
  6. 我们把默认并发度设置成100,这样跟适合我们的数据量。当然这个值也可以在transformation的方法里面指定。
2. 常见问题
我们使用Yarn作为Spark的资源调度器,因此我们也需要遵守Yarn的规则。
2.1 监控
在Yarn上执行Spark的时候,我们可以清楚看到Spark的监控状况,而当任务结束后,Spark启动的监控也会关闭。如果需要查看执行状况就需要进入Yarn的监控页面查看对应作业的日志。另外,Yarn上是允许一个物理机启动多个container,那么从Spark监控直接查看Container的日志将会十分困难,这时候就需要对源码做一定的修改来优化日志查看。
2.2 Yarn资源使用
我们一般都会使用下面几个参数:
  • --worker-memory
  • --worker-cores
  • --num-workers
  • --master-memory

这里有一个需要注意的地方,就是一个worker中,是多个core共享memory的。如果要使得作业能稳定执行,worker-cores这个参数应该设置得约小约好,然后配合着申请适当的内存,提高num-workers的数量就能提搞并行度。

评论

此博客中的热门博文

Spark 矩阵相乘实现

矩阵相乘计算的意义十分重要,比如我们做数据分析经常使用到的join操作就可以理解成为矩阵相乘的一个部分,矩阵相乘在很多分布式计算框架都有自己的实现,这些实现也可以根据不同大小的矩阵做不同的优化,比如在MapReduce上就有两种基本的实现: 大矩阵乘小矩阵 我可以把小矩阵放到DistrubutedCache,让每个mapper读取,这就是hive的MapJoin的实现。 两个大矩阵相乘 相乘中两个矩阵都是超大矩阵的话,为了减少MapReduce过程中产生的巨大数据,使用的带宽。都会采用矩阵分块的做法,以下会详细介绍这种实现方法。 Spark介绍 官网介绍 超大矩阵拆分计算方法 单机(单线程)矩阵相乘 假设有矩阵 A , B ,相乘的结果为 C ,那么相乘的伪代码: C = 0 for(i <- 0 to A.lenght) for(k <- 0 to A[i].lenght) if(A[i][k]!=0) for(j <- B[k].lenght) { C[i][j] += A[i][k] * B[k][j] } 单机的计算中,时间复杂度是 O(n^3),如果 n 增长到一定程度的时候,那么计算用时就会非常大,所以下面会说一下并行计算方法。 并行矩阵相乘 在计算的过程中,我们很容易发现每个 A[i][k] * B[k][j] 都可以单独计算,因此我也可以单独计算这些组合,最后做一个累计就可以获得 C 矩阵了,相乘伪代码如下: A = ((row, column), value) => (column, (row, value)) B = ((row, column), value) => (row, (column, value)) Temp = A.join(B) => (k, (rowA, valueA), (columnB, valueB)) => ((rowA, columnB), valueA * valueB) => ((rowC, columnC), valueC) C = Temp.reduceByKey =...

iphone 自动打包脚本

最近做ios开发,经常需要给老大打ipa包,这个虽然在xcode中编译并打包是很简单的事,不过每次都得花几分钟的时间做一些手动的放入Payload并压缩成zip包的操作。比较麻烦的是,在开发过程中,突然就说要一个可以执行的包做测试。那么,思路断了,正在写的代码要注释掉,这样持续下去浪费的时间会很多,所以还是需要写一个打包脚本。 打包具体用到的命令是这些: xcodebuild: 主要用于编译项目 xcrun: 主要用于打ipa包 具体打包流程就是编译,然后打一个发布包,一个ipa包,其实用脚本来说话就好了。另外,我用一个conf.dat来存放target和configuration,这些都在xcode里面指定好了,用xxx:xxx这样的格式来存放,xcodebuild在编译的时候会自动找到对应的配置。 打包脚本如下: #!/bin/sh basePath=`pwd` distDir="target" distDir="${basePath}/${distDir}" rm -rdf "$distDir" mkdir -p "$distDir" baseName="xxx" #.app 的名字 projectDir=$(cd ../mobile/xxx; pwd) # 进入xcode工程目录 cd $projectDir for line in $(cat ${basePath}/conf.dat) do targetName=`echo $line | cut -f1 -d':'` conf=`echo $line | cut -f2 -d':'` releaseDir="${projectDir}/build/${conf}-iphoneos" rm -rdf "$releaseDir" echo "======build ${baseName}.app start..." echo "======clean ${conf}..." xcodebuild clean -configuration "$...

【转】ELO-对弈与排名(有触感的排名)

原文: https://www.cnblogs.com/leoin2012/p/4854442.html ELO介绍 ELO等级分制度 是指由 匈牙利 裔 美国 物理学家 Elo创建的一个衡量各类对弈活动水平的评价方法,是当今对弈水平评估的公认的权威方法。被广泛用于 国际象棋 、 围棋 、 足球 、 篮球 等运动。网络游戏 英雄联盟 、 魔兽世界 内的竞技对战系统也采用此分级制度。 历史 ELO等级分制度是基于 统计学 的一个评估棋手水平的方法。 美国 国际象棋 协会在1960年首先使用这种计分方法。由于它比先前的方法更公平客观,这种方法很快流行开来。1970年国际棋联正式开始使用等级分制度。 Elo模型原先采用 正态分布 。但是实践显明棋手的表现并非呈正态分布,所以现在的等级分计分系统通常使用的是 Logistic distribution 。 计分方法 假设棋手A和B的当前等级分分别为 和 ,则按Logistic distribution A对B的胜率期望值当为 类似B对A的胜率为 假如一位棋手在比赛中的真实得分 (胜=1分,和=0.5分,负=0分)和他的胜率期望值 不同,则他的等级分要作相应的调整。具体的数学公式为 公式中 和 分别为棋手调整前后的等级分。在大师级比赛中 通常为16。 例如,棋手A等级分为1613,与等级分为1573的棋手B战平。若K取32,则A的胜率期望值为 ,约为0.5573,因而A的新等级分为1613 + 32 · (0.5 − 0.5573) = 1611.166 国际象棋中的等级分 国际象棋中,等级分和棋联称号的大致对应为 2500分以上:国际特级大师 2400-2499分:国际大师 2300-2399分:棋联大师