跳至主要内容

Spark 矩阵相乘实现

矩阵相乘计算的意义十分重要,比如我们做数据分析经常使用到的join操作就可以理解成为矩阵相乘的一个部分,矩阵相乘在很多分布式计算框架都有自己的实现,这些实现也可以根据不同大小的矩阵做不同的优化,比如在MapReduce上就有两种基本的实现:
  1. 大矩阵乘小矩阵
    我可以把小矩阵放到DistrubutedCache,让每个mapper读取,这就是hive的MapJoin的实现。
  2. 两个大矩阵相乘
    相乘中两个矩阵都是超大矩阵的话,为了减少MapReduce过程中产生的巨大数据,使用的带宽。都会采用矩阵分块的做法,以下会详细介绍这种实现方法。


Spark介绍

官网介绍


超大矩阵拆分计算方法


单机(单线程)矩阵相乘

假设有矩阵AB,相乘的结果为C,那么相乘的伪代码:
C = 0
for(i <- 0 to A.lenght)
    for(k <- 0 to A[i].lenght) if(A[i][k]!=0)
        for(j <- B[k].lenght) {
            C[i][j] += A[i][k] * B[k][j]
        }
单机的计算中,时间复杂度是 O(n^3),如果 n 增长到一定程度的时候,那么计算用时就会非常大,所以下面会说一下并行计算方法。


并行矩阵相乘

在计算的过程中,我们很容易发现每个A[i][k] * B[k][j]都可以单独计算,因此我也可以单独计算这些组合,最后做一个累计就可以获得C矩阵了,相乘伪代码如下:
A = ((row, column), value) => (column, (row, value))
B = ((row, column), value) => (row, (column, value))
Temp = A.join(B) => (k, (rowA, valueA), (columnB, valueB)) 
                 => ((rowA, columnB), valueA * valueB)
                 => ((rowC, columnC), valueC)
C = Temp.reduceByKey => ((rowC, columnC), sumC)
  • 矩阵AB的表示方式不是数组的形式,而是一个数据集合的形式,上面的格式就是这个数据集合每一行数据的格式
  • =>表示数据格式的转换
  • joinreduceByKey跟Hive或SQL的语法类似,可以理解成为并发操作


大矩阵拆分

当矩阵AB膨胀到一定大小,比如行列大小均为100万的超大矩阵,那么进行并行相乘的时候就会产生大量的中间数据,以上面伪代码中的Temp为例,中间数据条数将会达到10^18条,如果按照每行数据存储时使用2个Int和1个Double,那么总共使用的存储空间为 10000PB,这个数据量差不多是100亿个A.V.I(你们懂的)视频,这个估计就Google能一次存下来了。
考虑到这种计算难度,或者说还没有比较好能进行并行异步计算的框架,这种矩阵计算就需要拆分成多个小矩阵来做乘法。


如上图,其实矩阵拆分差不多就是把整个大矩阵划分成一个个方块,我们可以分别从AB中随便挑选两个矩阵做并行矩阵相乘,中间数据都不至于超出我们能存储的范围。


Spark实现

选择Spark来写这份代码有两个好处,(当然还有其他好处了)
  1. Spark的编程接口几乎与函数式编程的方式相同,只要梳理好逻辑,就非常容易编写,并且我可以一气呵成的写完所有代码,无需像MapReduce写一堆冗长的代码何定义多个类。
  2. Spark的计算所产生的中间数据都可以使用内存来缓存,只要申请足够的内存就可以加速我的计算了。

矩阵相乘的入口函数如下:

def multiMat[T: ClassManifest](
               spark: SparkContext,
               mat1: RDD[(Int, (Int, Double))],
               mat2: RDD[(Int, (Int, Double))],
               multiFunc: ((T, (Seq[(T, Double)], Seq[(T, Double)]))) => TraversableOnce[((T, T), Double)]
            ): RDD[((Int, Int), Double)] = {
    var res: RDD[((Int, Int), Double)] = null
    val cntRow = mat1.map(item => item._2._1).distinct().count()
    val cntCol = mat2.map(item => item._2._1).distinct().count()
    //拆分矩阵中,每个小矩阵的维度设定为100000
    val blockSize = 100000
    //如果维度少于100000,就没有必要再做矩阵拆分计算了
    if ((cntRow / blockSize) * (cntCol / blockSize) > 1) {
        res = multiMatPartition(spark, mat1, mat2, multiFunc, cntRow, cntCol, blockSize)
    } else {
        res = multiMat0(spark, mat1, mat2, multiFunc, blockSize)
    }
    res
}

分块函数:

def id2Part[T](id: T, partNum: Int)(implicit cm: ClassManifest[T]): Long = {
    (id.hashCode() & 0x7FFFFFFF) % partNum
}

def splitMatrix[T: ClassManifest](spark: SparkContext,
                      mat: RDD[(T, (T, Double))],
                      keyCount: Long,
                      blockSize: Int
                       ): Array[RDD[(T, (T, Double))]] = {
    val keyPart = ((keyCount + blockSize - 1)/ blockSize).toInt
    val bKeyPart = spark.broadcast(keyPart)
    val pMat = mat.map(item => (item._2._1, (item._1, item._2._2)))
    val keyRdds = ArrayBuffer[RDD[(T, (T, Double))]]()
    for (i <- 0 until keyPart) {
        val bI = spark.broadcast(i)
        //hash的方式做矩阵拆分,经量使矩阵拆分得均匀,适用于稀疏矩阵
        var tMat = pMat.filter(item => id2Part(item._1, bKeyPart.value) == bI.value)
        tMat = tMat.map(item => (item._2._1, (item._1, item._2._2)))
        tMat.checkpoint()
        tMat.foreach{(_)=>{}}
        keyRdds += tMat
    }
    keyRdds.toArray
}

并行计算矩阵函数:

def multiMat0[T: ClassManifest](
                spark: SparkContext,
                mat1: RDD[(T, (T, Double))],
                mat2: RDD[(T, (T, Double))],
                multiFunc: ((T, (Seq[(T, Double)], Seq[(T, Double)]))) => TraversableOnce[((T, T), Double)],
                blockSize: Int = -1
            ): RDD[((T, T), Double)] = {
    if(blockSize == -1){
        return mat1.cogroup(mat2).flatMap(multiFunc).reduceByKey(_ + _)
    }
    val resMats = ArrayBuffer[RDD[((T, T), Double)]]()
    val maxCnt = math.max(mat1.count(), mat2.count())
    val keyPart = ((maxCnt + blockSize - 1) / blockSize).toInt
    val bKeyPart = spark.broadcast(keyPart)
    for (i <- 0 until keyPart) {
        val bI = spark.broadcast(i)
        val subMat1 = mat1.filter(items => id2Part(items._1, bKeyPart.value) == bI.value)
        val subMat2 = mat2.filter(items => id2Part(items._1, bKeyPart.value) == bI.value)
        val tMat = subMat1.cogroup(subMat2, partitionNum).flatMap(multiFunc).reduceByKey(_ + _, partitionNum)
        //checkpoint可以简单地把计算好的结果存放到HDFS中,
        //这样比用saveAsTextFile的接口好多了,
        //因为我不在需要考虑存放的位置,Spark也可以读出来使用
        tMat.checkpoint()
        resMats += tMat
    }
    resMats.reduce((m1, m2)=>m1.union(m2)).reduceByKey(_ + _, partitionNum)
}

大矩阵相乘函数:

def multiMatPartition[T: ClassManifest](spark: SparkContext,
                    mat1: RDD[(T, (T, Double))],
                    mat2: RDD[(T, (T, Double))],
                    multiFunc: ((T, (Seq[(T, Double)], Seq[(T, Double)]))) => TraversableOnce[((T, T), Double)],
                    rowCount: Long,
                    colCount: Long,
                    blockSize: Int = 100000
                ): RDD[((T, T), Double)] = {
    var subMats = ArrayBuffer[RDD[((T, T), Double)]]()
    val rowRdds = splitMatrix(spark, mat1, rowCount, blockSize)
    val colRdds = splitMatrix(spark, mat2, colCount, blockSize)
    for (i <- 0 until rowRdds.length) {
        val tMat1 = rowRdds(i)
        for (j <- 0 until colRdds.length) {
            val tMat2 = colRdds(j)
            val tRes = multiMat0(spark, tMat1, tMat2, multiFunc, blockSize)
            tRes.checkpoint()
            tRes.foreach{(_)=>{}}
            subMats += tRes
        }
    }
    val mulRes = subMats.reduce((mat1, mat2) => mat1.union(mat2))
    mulRes
}


细节调优

Spark可以通过一口气完成整个矩阵相乘的流程,但是内存是存不下这么多数据的,而Spark会根据LRU的策略把一些不再使用的结果清理出内存,这时为了保证我的计算能成功,我使用了Spark的checkpoint功能,当然这会十分影响整个框架的计算性能,但它带来的是计算稳定,已经计算出来的结果不会再重算。这里还需要注意一点是,checkpoint文件夹需要做适时的处理,不可以让里面的文件数和占用空间增大,否则就算多牛逼的集群都会支持不下去。

评论

此博客中的热门博文

iphone 自动打包脚本

最近做ios开发,经常需要给老大打ipa包,这个虽然在xcode中编译并打包是很简单的事,不过每次都得花几分钟的时间做一些手动的放入Payload并压缩成zip包的操作。比较麻烦的是,在开发过程中,突然就说要一个可以执行的包做测试。那么,思路断了,正在写的代码要注释掉,这样持续下去浪费的时间会很多,所以还是需要写一个打包脚本。 打包具体用到的命令是这些: xcodebuild: 主要用于编译项目 xcrun: 主要用于打ipa包 具体打包流程就是编译,然后打一个发布包,一个ipa包,其实用脚本来说话就好了。另外,我用一个conf.dat来存放target和configuration,这些都在xcode里面指定好了,用xxx:xxx这样的格式来存放,xcodebuild在编译的时候会自动找到对应的配置。 打包脚本如下: #!/bin/sh basePath=`pwd` distDir="target" distDir="${basePath}/${distDir}" rm -rdf "$distDir" mkdir -p "$distDir" baseName="xxx" #.app 的名字 projectDir=$(cd ../mobile/xxx; pwd) # 进入xcode工程目录 cd $projectDir for line in $(cat ${basePath}/conf.dat) do targetName=`echo $line | cut -f1 -d':'` conf=`echo $line | cut -f2 -d':'` releaseDir="${projectDir}/build/${conf}-iphoneos" rm -rdf "$releaseDir" echo "======build ${baseName}.app start..." echo "======clean ${conf}..." xcodebuild clean -configuration "$...

【转】ELO-对弈与排名(有触感的排名)

原文: https://www.cnblogs.com/leoin2012/p/4854442.html ELO介绍 ELO等级分制度 是指由 匈牙利 裔 美国 物理学家 Elo创建的一个衡量各类对弈活动水平的评价方法,是当今对弈水平评估的公认的权威方法。被广泛用于 国际象棋 、 围棋 、 足球 、 篮球 等运动。网络游戏 英雄联盟 、 魔兽世界 内的竞技对战系统也采用此分级制度。 历史 ELO等级分制度是基于 统计学 的一个评估棋手水平的方法。 美国 国际象棋 协会在1960年首先使用这种计分方法。由于它比先前的方法更公平客观,这种方法很快流行开来。1970年国际棋联正式开始使用等级分制度。 Elo模型原先采用 正态分布 。但是实践显明棋手的表现并非呈正态分布,所以现在的等级分计分系统通常使用的是 Logistic distribution 。 计分方法 假设棋手A和B的当前等级分分别为 和 ,则按Logistic distribution A对B的胜率期望值当为 类似B对A的胜率为 假如一位棋手在比赛中的真实得分 (胜=1分,和=0.5分,负=0分)和他的胜率期望值 不同,则他的等级分要作相应的调整。具体的数学公式为 公式中 和 分别为棋手调整前后的等级分。在大师级比赛中 通常为16。 例如,棋手A等级分为1613,与等级分为1573的棋手B战平。若K取32,则A的胜率期望值为 ,约为0.5573,因而A的新等级分为1613 + 32 · (0.5 − 0.5573) = 1611.166 国际象棋中的等级分 国际象棋中,等级分和棋联称号的大致对应为 2500分以上:国际特级大师 2400-2499分:国际大师 2300-2399分:棋联大师