矩阵相乘计算的意义十分重要,比如我们做数据分析经常使用到的join操作就可以理解成为矩阵相乘的一个部分,矩阵相乘在很多分布式计算框架都有自己的实现,这些实现也可以根据不同大小的矩阵做不同的优化,比如在MapReduce上就有两种基本的实现:
考虑到这种计算难度,或者说还没有比较好能进行并行异步计算的框架,这种矩阵计算就需要拆分成多个小矩阵来做乘法。
如上图,其实矩阵拆分差不多就是把整个大矩阵划分成一个个方块,我们可以分别从
矩阵相乘的入口函数如下:
分块函数:
并行计算矩阵函数:
大矩阵相乘函数:
- 大矩阵乘小矩阵
我可以把小矩阵放到DistrubutedCache,让每个mapper读取,这就是hive的MapJoin的实现。 - 两个大矩阵相乘
相乘中两个矩阵都是超大矩阵的话,为了减少MapReduce过程中产生的巨大数据,使用的带宽。都会采用矩阵分块的做法,以下会详细介绍这种实现方法。
Spark介绍
官网介绍超大矩阵拆分计算方法
单机(单线程)矩阵相乘
假设有矩阵A,B,相乘的结果为C,那么相乘的伪代码:C = 0
for(i <- 0 to A.lenght)
for(k <- 0 to A[i].lenght) if(A[i][k]!=0)
for(j <- B[k].lenght) {
C[i][j] += A[i][k] * B[k][j]
}
单机的计算中,时间复杂度是 O(n^3),如果 n 增长到一定程度的时候,那么计算用时就会非常大,所以下面会说一下并行计算方法。并行矩阵相乘
在计算的过程中,我们很容易发现每个A[i][k] * B[k][j]都可以单独计算,因此我也可以单独计算这些组合,最后做一个累计就可以获得C矩阵了,相乘伪代码如下:A = ((row, column), value) => (column, (row, value))
B = ((row, column), value) => (row, (column, value))
Temp = A.join(B) => (k, (rowA, valueA), (columnB, valueB))
=> ((rowA, columnB), valueA * valueB)
=> ((rowC, columnC), valueC)
C = Temp.reduceByKey => ((rowC, columnC), sumC)
- 矩阵
A、B的表示方式不是数组的形式,而是一个数据集合的形式,上面的格式就是这个数据集合每一行数据的格式 =>表示数据格式的转换join与reduceByKey跟Hive或SQL的语法类似,可以理解成为并发操作
大矩阵拆分
当矩阵A、B膨胀到一定大小,比如行列大小均为100万的超大矩阵,那么进行并行相乘的时候就会产生大量的中间数据,以上面伪代码中的Temp为例,中间数据条数将会达到10^18条,如果按照每行数据存储时使用2个Int和1个Double,那么总共使用的存储空间为 10000PB,这个数据量差不多是100亿个A.V.I(你们懂的)视频,这个估计就Google能一次存下来了。考虑到这种计算难度,或者说还没有比较好能进行并行异步计算的框架,这种矩阵计算就需要拆分成多个小矩阵来做乘法。
如上图,其实矩阵拆分差不多就是把整个大矩阵划分成一个个方块,我们可以分别从
A和B中随便挑选两个矩阵做并行矩阵相乘,中间数据都不至于超出我们能存储的范围。Spark实现
选择Spark来写这份代码有两个好处,(当然还有其他好处了)- Spark的编程接口几乎与函数式编程的方式相同,只要梳理好逻辑,就非常容易编写,并且我可以一气呵成的写完所有代码,无需像MapReduce写一堆冗长的代码何定义多个类。
- Spark的计算所产生的中间数据都可以使用内存来缓存,只要申请足够的内存就可以加速我的计算了。
矩阵相乘的入口函数如下:
def multiMat[T: ClassManifest](
spark: SparkContext,
mat1: RDD[(Int, (Int, Double))],
mat2: RDD[(Int, (Int, Double))],
multiFunc: ((T, (Seq[(T, Double)], Seq[(T, Double)]))) => TraversableOnce[((T, T), Double)]
): RDD[((Int, Int), Double)] = {
var res: RDD[((Int, Int), Double)] = null
val cntRow = mat1.map(item => item._2._1).distinct().count()
val cntCol = mat2.map(item => item._2._1).distinct().count()
//拆分矩阵中,每个小矩阵的维度设定为100000
val blockSize = 100000
//如果维度少于100000,就没有必要再做矩阵拆分计算了
if ((cntRow / blockSize) * (cntCol / blockSize) > 1) {
res = multiMatPartition(spark, mat1, mat2, multiFunc, cntRow, cntCol, blockSize)
} else {
res = multiMat0(spark, mat1, mat2, multiFunc, blockSize)
}
res
}
分块函数:
def id2Part[T](id: T, partNum: Int)(implicit cm: ClassManifest[T]): Long = {
(id.hashCode() & 0x7FFFFFFF) % partNum
}
def splitMatrix[T: ClassManifest](spark: SparkContext,
mat: RDD[(T, (T, Double))],
keyCount: Long,
blockSize: Int
): Array[RDD[(T, (T, Double))]] = {
val keyPart = ((keyCount + blockSize - 1)/ blockSize).toInt
val bKeyPart = spark.broadcast(keyPart)
val pMat = mat.map(item => (item._2._1, (item._1, item._2._2)))
val keyRdds = ArrayBuffer[RDD[(T, (T, Double))]]()
for (i <- 0 until keyPart) {
val bI = spark.broadcast(i)
//hash的方式做矩阵拆分,经量使矩阵拆分得均匀,适用于稀疏矩阵
var tMat = pMat.filter(item => id2Part(item._1, bKeyPart.value) == bI.value)
tMat = tMat.map(item => (item._2._1, (item._1, item._2._2)))
tMat.checkpoint()
tMat.foreach{(_)=>{}}
keyRdds += tMat
}
keyRdds.toArray
}
并行计算矩阵函数:
def multiMat0[T: ClassManifest](
spark: SparkContext,
mat1: RDD[(T, (T, Double))],
mat2: RDD[(T, (T, Double))],
multiFunc: ((T, (Seq[(T, Double)], Seq[(T, Double)]))) => TraversableOnce[((T, T), Double)],
blockSize: Int = -1
): RDD[((T, T), Double)] = {
if(blockSize == -1){
return mat1.cogroup(mat2).flatMap(multiFunc).reduceByKey(_ + _)
}
val resMats = ArrayBuffer[RDD[((T, T), Double)]]()
val maxCnt = math.max(mat1.count(), mat2.count())
val keyPart = ((maxCnt + blockSize - 1) / blockSize).toInt
val bKeyPart = spark.broadcast(keyPart)
for (i <- 0 until keyPart) {
val bI = spark.broadcast(i)
val subMat1 = mat1.filter(items => id2Part(items._1, bKeyPart.value) == bI.value)
val subMat2 = mat2.filter(items => id2Part(items._1, bKeyPart.value) == bI.value)
val tMat = subMat1.cogroup(subMat2, partitionNum).flatMap(multiFunc).reduceByKey(_ + _, partitionNum)
//checkpoint可以简单地把计算好的结果存放到HDFS中,
//这样比用saveAsTextFile的接口好多了,
//因为我不在需要考虑存放的位置,Spark也可以读出来使用
tMat.checkpoint()
resMats += tMat
}
resMats.reduce((m1, m2)=>m1.union(m2)).reduceByKey(_ + _, partitionNum)
}
大矩阵相乘函数:
def multiMatPartition[T: ClassManifest](spark: SparkContext,
mat1: RDD[(T, (T, Double))],
mat2: RDD[(T, (T, Double))],
multiFunc: ((T, (Seq[(T, Double)], Seq[(T, Double)]))) => TraversableOnce[((T, T), Double)],
rowCount: Long,
colCount: Long,
blockSize: Int = 100000
): RDD[((T, T), Double)] = {
var subMats = ArrayBuffer[RDD[((T, T), Double)]]()
val rowRdds = splitMatrix(spark, mat1, rowCount, blockSize)
val colRdds = splitMatrix(spark, mat2, colCount, blockSize)
for (i <- 0 until rowRdds.length) {
val tMat1 = rowRdds(i)
for (j <- 0 until colRdds.length) {
val tMat2 = colRdds(j)
val tRes = multiMat0(spark, tMat1, tMat2, multiFunc, blockSize)
tRes.checkpoint()
tRes.foreach{(_)=>{}}
subMats += tRes
}
}
val mulRes = subMats.reduce((mat1, mat2) => mat1.union(mat2))
mulRes
}

评论
发表评论