跳至主要内容

博文

目前显示的是 十月, 2013的博文

Spark 执行调优——存储控制

使用Spark确实能非常大地提高我的工作效率,无论从使用的简易性和执行的速度来看,都比我用MapReduce写算法要快得多。在使用了将近3个月的时间,我还是坚定不移地选择使用Spark来取代我以后的数据分析和算法编写工作。 以下,就写一下使用Spark时自己的一些经验好了,没有使用Spark经验的同学可能比较难懂。 1. 关于Shuffle 现在我使用Spark写了一个wordcount: val text = spark.textFile(/path/...) val counts = text.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_) counts.saveAsTextFile(/path/...) 那么在这个过程中,DAGSheduler为我们产生了多少个Stage呢?让我们看看Stage拆分图: 从图中可以看出,reduceByKey是导致Stage拆分的主因,写过MapReduce的人都会知道,这个操作需要把相同key的数据全部聚集到一个机器才能接着进行计算,这个过程就是Shuffle。从这里可以看出,Spark的DAGScheduler会根据Shuffle来划分多个Stage。 2. 关于数据存储 2.1 存储模块 都说Spark是基于内存的分布式计算框架,号称计算过程数据完全不用落地,减少文件IO,那么这个又是怎么做的呢? BlockManager: Spark会利用一个叫BlockManager的模块做数据存储控制,这个存储包括内存和磁盘存储。 MapOutputTracker:用于记录Stage的每个分块的存储位置。 配合上面这两个工具,我们可以知道每个Stage的存储都是分布在各个Executor(工作机)中。实际上,当一个Stage在各个机器上计算好自己的分块时,它们的结果就会直接保存在所在机器中,而通常存储的结果就是Stage最后一个RDD的结果。 2.2 cache & persist 2.2.1 cache 我们可以很简单地调用cache: rdd.cache() 就可以告诉Spark我需要把这个 rdd 存到内存中,比如这个 rdd 不是Stage的最后一个RDD,S...

Spark 矩阵相乘实现

矩阵相乘计算的意义十分重要,比如我们做数据分析经常使用到的join操作就可以理解成为矩阵相乘的一个部分,矩阵相乘在很多分布式计算框架都有自己的实现,这些实现也可以根据不同大小的矩阵做不同的优化,比如在MapReduce上就有两种基本的实现: 大矩阵乘小矩阵 我可以把小矩阵放到DistrubutedCache,让每个mapper读取,这就是hive的MapJoin的实现。 两个大矩阵相乘 相乘中两个矩阵都是超大矩阵的话,为了减少MapReduce过程中产生的巨大数据,使用的带宽。都会采用矩阵分块的做法,以下会详细介绍这种实现方法。 Spark介绍 官网介绍 超大矩阵拆分计算方法 单机(单线程)矩阵相乘 假设有矩阵 A , B ,相乘的结果为 C ,那么相乘的伪代码: C = 0 for(i <- 0 to A.lenght) for(k <- 0 to A[i].lenght) if(A[i][k]!=0) for(j <- B[k].lenght) { C[i][j] += A[i][k] * B[k][j] } 单机的计算中,时间复杂度是 O(n^3),如果 n 增长到一定程度的时候,那么计算用时就会非常大,所以下面会说一下并行计算方法。 并行矩阵相乘 在计算的过程中,我们很容易发现每个 A[i][k] * B[k][j] 都可以单独计算,因此我也可以单独计算这些组合,最后做一个累计就可以获得 C 矩阵了,相乘伪代码如下: A = ((row, column), value) => (column, (row, value)) B = ((row, column), value) => (row, (column, value)) Temp = A.join(B) => (k, (rowA, valueA), (columnB, valueB)) => ((rowA, columnB), valueA * valueB) => ((rowC, columnC), valueC) C = Temp.reduceByKey =...