使用Spark确实能非常大地提高我的工作效率,无论从使用的简易性和执行的速度来看,都比我用MapReduce写算法要快得多。在使用了将近3个月的时间,我还是坚定不移地选择使用Spark来取代我以后的数据分析和算法编写工作。 以下,就写一下使用Spark时自己的一些经验好了,没有使用Spark经验的同学可能比较难懂。 1. 关于Shuffle 现在我使用Spark写了一个wordcount: val text = spark.textFile(/path/...) val counts = text.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_) counts.saveAsTextFile(/path/...) 那么在这个过程中,DAGSheduler为我们产生了多少个Stage呢?让我们看看Stage拆分图: 从图中可以看出,reduceByKey是导致Stage拆分的主因,写过MapReduce的人都会知道,这个操作需要把相同key的数据全部聚集到一个机器才能接着进行计算,这个过程就是Shuffle。从这里可以看出,Spark的DAGScheduler会根据Shuffle来划分多个Stage。 2. 关于数据存储 2.1 存储模块 都说Spark是基于内存的分布式计算框架,号称计算过程数据完全不用落地,减少文件IO,那么这个又是怎么做的呢? BlockManager: Spark会利用一个叫BlockManager的模块做数据存储控制,这个存储包括内存和磁盘存储。 MapOutputTracker:用于记录Stage的每个分块的存储位置。 配合上面这两个工具,我们可以知道每个Stage的存储都是分布在各个Executor(工作机)中。实际上,当一个Stage在各个机器上计算好自己的分块时,它们的结果就会直接保存在所在机器中,而通常存储的结果就是Stage最后一个RDD的结果。 2.2 cache & persist 2.2.1 cache 我们可以很简单地调用cache: rdd.cache() 就可以告诉Spark我需要把这个 rdd 存到内存中,比如这个 rdd 不是Stage的最后一个RDD,S...