使用Spark确实能非常大地提高我的工作效率,无论从使用的简易性和执行的速度来看,都比我用MapReduce写算法要快得多。在使用了将近3个月的时间,我还是坚定不移地选择使用Spark来取代我以后的数据分析和算法编写工作。
以下,就写一下使用Spark时自己的一些经验好了,没有使用Spark经验的同学可能比较难懂。
从图中可以看出,reduceByKey是导致Stage拆分的主因,写过MapReduce的人都会知道,这个操作需要把相同key的数据全部聚集到一个机器才能接着进行计算,这个过程就是Shuffle。从这里可以看出,Spark的DAGScheduler会根据Shuffle来划分多个Stage。
checkpoint使用:
以下,就写一下使用Spark时自己的一些经验好了,没有使用Spark经验的同学可能比较难懂。
1. 关于Shuffle
现在我使用Spark写了一个wordcount:val text = spark.textFile(/path/...)
val counts = text.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_)
counts.saveAsTextFile(/path/...)
那么在这个过程中,DAGSheduler为我们产生了多少个Stage呢?让我们看看Stage拆分图:从图中可以看出,reduceByKey是导致Stage拆分的主因,写过MapReduce的人都会知道,这个操作需要把相同key的数据全部聚集到一个机器才能接着进行计算,这个过程就是Shuffle。从这里可以看出,Spark的DAGScheduler会根据Shuffle来划分多个Stage。
2. 关于数据存储
2.1 存储模块
都说Spark是基于内存的分布式计算框架,号称计算过程数据完全不用落地,减少文件IO,那么这个又是怎么做的呢?- BlockManager: Spark会利用一个叫BlockManager的模块做数据存储控制,这个存储包括内存和磁盘存储。
- MapOutputTracker:用于记录Stage的每个分块的存储位置。
2.2 cache & persist
2.2.1 cache
我们可以很简单地调用cache:rdd.cache()就可以告诉Spark我需要把这个
rdd存到内存中,比如这个rdd不是Stage的最后一个RDD,Spark也会使用BlockManager让它保存好。2.2.2 persist
说到cache就不得不提persist,其实cache是persist的一个封装而已。persist提供DISK_ONLY,MEMORY_ONLY,MEMORY_AND_DISK等存储方法,可以方便控制我们需要存储的地方。2.2.3 unpersist
unpersist可以对persist过的RDD进行清理,这样就可以节约出内存来做后续的计算。3. 存储容灾
以下一内存为例,磁盘存储的容灾方式差不多。3.1 存储的数据过多
BlockManager对存储的控制是使用LRU的策略,就是最长时间没有使用的数据将会优先清理。3.2 某些Executor失去连接或挂掉
这样一般会导致某些Stage失去部分数据,比如20%,这是Spark并不会急于重新计算,而是等到有其他的Stage来请求这个失去数据的Stage的时候,去重新计算失去部分的数据,这个策略真是相当智能。3.3 参数调优
根据以上的说法,Spark真是一个吃内存的大户,多少内存都可以吃掉,这是有一个比较重要的参数可以调节- spark.storage.memoryFraction=0.66
3.4 极限情况
当我们整个Spark集群总共拥有400G内存,而一个Stage就占用了100G以上的内存,可想而知这对存储和Shuffle都有很大的压力。这时候可以使用checkpoint。checkpint
checkpoint可以把数据存放到hdfs中,这就是所谓的数据落地,虽然这违背了Spark的计算原则,但在这种数据量面前我们还是需要屈服,最完美得是,Spark知道checkpoint这个东西存在,不用我们特别指定位置,Spark在计算的时候会按需使用。checkpoint使用:
spark.setCheckpointDir(/path/...) ... rdd.checkpoint() //action函数执行之前使用

评论
发表评论