跳至主要内容

Spark 执行调优——存储控制

使用Spark确实能非常大地提高我的工作效率,无论从使用的简易性和执行的速度来看,都比我用MapReduce写算法要快得多。在使用了将近3个月的时间,我还是坚定不移地选择使用Spark来取代我以后的数据分析和算法编写工作。
以下,就写一下使用Spark时自己的一些经验好了,没有使用Spark经验的同学可能比较难懂。


1. 关于Shuffle

现在我使用Spark写了一个wordcount:
val text = spark.textFile(/path/...)
val counts = text.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_)
counts.saveAsTextFile(/path/...)
那么在这个过程中,DAGSheduler为我们产生了多少个Stage呢?让我们看看Stage拆分图:


从图中可以看出,reduceByKey是导致Stage拆分的主因,写过MapReduce的人都会知道,这个操作需要把相同key的数据全部聚集到一个机器才能接着进行计算,这个过程就是Shuffle。从这里可以看出,Spark的DAGScheduler会根据Shuffle来划分多个Stage。


2. 关于数据存储

2.1 存储模块

都说Spark是基于内存的分布式计算框架,号称计算过程数据完全不用落地,减少文件IO,那么这个又是怎么做的呢?
  • BlockManager: Spark会利用一个叫BlockManager的模块做数据存储控制,这个存储包括内存和磁盘存储。
  • MapOutputTracker:用于记录Stage的每个分块的存储位置。
配合上面这两个工具,我们可以知道每个Stage的存储都是分布在各个Executor(工作机)中。实际上,当一个Stage在各个机器上计算好自己的分块时,它们的结果就会直接保存在所在机器中,而通常存储的结果就是Stage最后一个RDD的结果。


2.2 cache & persist

2.2.1 cache
我们可以很简单地调用cache:
rdd.cache()
就可以告诉Spark我需要把这个rdd存到内存中,比如这个rdd不是Stage的最后一个RDD,Spark也会使用BlockManager让它保存好。
2.2.2 persist
说到cache就不得不提persist,其实cache是persist的一个封装而已。persist提供DISK_ONLYMEMORY_ONLYMEMORY_AND_DISK等存储方法,可以方便控制我们需要存储的地方。
2.2.3 unpersist
unpersist可以对persist过的RDD进行清理,这样就可以节约出内存来做后续的计算。


3. 存储容灾

以下一内存为例,磁盘存储的容灾方式差不多。


3.1 存储的数据过多

BlockManager对存储的控制是使用LRU的策略,就是最长时间没有使用的数据将会优先清理。


3.2 某些Executor失去连接或挂掉

这样一般会导致某些Stage失去部分数据,比如20%,这是Spark并不会急于重新计算,而是等到有其他的Stage来请求这个失去数据的Stage的时候,去重新计算失去部分的数据,这个策略真是相当智能。


3.3 参数调优

根据以上的说法,Spark真是一个吃内存的大户,多少内存都可以吃掉,这是有一个比较重要的参数可以调节
  • spark.storage.memoryFraction=0.66
这个参数表明Spark会使用现有的百分之多少内存来做缓存,默认值为66%,这个参数的分配非常重要。分多了,计算时能使用的内存就少了,特别Shuffle很多数据的时候,很容易就会导致OOM,增大了Executor丢失的概率;分少了,计算完一个Stage想存下来,MemoryStory一旦抓到机会就会清理掉数据,这样后面有依赖这个Stage的计算尽量,Spark需要重新计算了,虽然这对用户透明,但对效率影响相到大。


3.4 极限情况

当我们整个Spark集群总共拥有400G内存,而一个Stage就占用了100G以上的内存,可想而知这对存储和Shuffle都有很大的压力。这时候可以使用checkpoint。
checkpint
checkpoint可以把数据存放到hdfs中,这就是所谓的数据落地,虽然这违背了Spark的计算原则,但在这种数据量面前我们还是需要屈服,最完美得是,Spark知道checkpoint这个东西存在,不用我们特别指定位置,Spark在计算的时候会按需使用。
checkpoint使用:
spark.setCheckpointDir(/path/...)
...
rdd.checkpoint() //action函数执行之前使用

评论

此博客中的热门博文

Spark 矩阵相乘实现

矩阵相乘计算的意义十分重要,比如我们做数据分析经常使用到的join操作就可以理解成为矩阵相乘的一个部分,矩阵相乘在很多分布式计算框架都有自己的实现,这些实现也可以根据不同大小的矩阵做不同的优化,比如在MapReduce上就有两种基本的实现: 大矩阵乘小矩阵 我可以把小矩阵放到DistrubutedCache,让每个mapper读取,这就是hive的MapJoin的实现。 两个大矩阵相乘 相乘中两个矩阵都是超大矩阵的话,为了减少MapReduce过程中产生的巨大数据,使用的带宽。都会采用矩阵分块的做法,以下会详细介绍这种实现方法。 Spark介绍 官网介绍 超大矩阵拆分计算方法 单机(单线程)矩阵相乘 假设有矩阵 A , B ,相乘的结果为 C ,那么相乘的伪代码: C = 0 for(i <- 0 to A.lenght) for(k <- 0 to A[i].lenght) if(A[i][k]!=0) for(j <- B[k].lenght) { C[i][j] += A[i][k] * B[k][j] } 单机的计算中,时间复杂度是 O(n^3),如果 n 增长到一定程度的时候,那么计算用时就会非常大,所以下面会说一下并行计算方法。 并行矩阵相乘 在计算的过程中,我们很容易发现每个 A[i][k] * B[k][j] 都可以单独计算,因此我也可以单独计算这些组合,最后做一个累计就可以获得 C 矩阵了,相乘伪代码如下: A = ((row, column), value) => (column, (row, value)) B = ((row, column), value) => (row, (column, value)) Temp = A.join(B) => (k, (rowA, valueA), (columnB, valueB)) => ((rowA, columnB), valueA * valueB) => ((rowC, columnC), valueC) C = Temp.reduceByKey =...

iphone 自动打包脚本

最近做ios开发,经常需要给老大打ipa包,这个虽然在xcode中编译并打包是很简单的事,不过每次都得花几分钟的时间做一些手动的放入Payload并压缩成zip包的操作。比较麻烦的是,在开发过程中,突然就说要一个可以执行的包做测试。那么,思路断了,正在写的代码要注释掉,这样持续下去浪费的时间会很多,所以还是需要写一个打包脚本。 打包具体用到的命令是这些: xcodebuild: 主要用于编译项目 xcrun: 主要用于打ipa包 具体打包流程就是编译,然后打一个发布包,一个ipa包,其实用脚本来说话就好了。另外,我用一个conf.dat来存放target和configuration,这些都在xcode里面指定好了,用xxx:xxx这样的格式来存放,xcodebuild在编译的时候会自动找到对应的配置。 打包脚本如下: #!/bin/sh basePath=`pwd` distDir="target" distDir="${basePath}/${distDir}" rm -rdf "$distDir" mkdir -p "$distDir" baseName="xxx" #.app 的名字 projectDir=$(cd ../mobile/xxx; pwd) # 进入xcode工程目录 cd $projectDir for line in $(cat ${basePath}/conf.dat) do targetName=`echo $line | cut -f1 -d':'` conf=`echo $line | cut -f2 -d':'` releaseDir="${projectDir}/build/${conf}-iphoneos" rm -rdf "$releaseDir" echo "======build ${baseName}.app start..." echo "======clean ${conf}..." xcodebuild clean -configuration "$...

【转】ELO-对弈与排名(有触感的排名)

原文: https://www.cnblogs.com/leoin2012/p/4854442.html ELO介绍 ELO等级分制度 是指由 匈牙利 裔 美国 物理学家 Elo创建的一个衡量各类对弈活动水平的评价方法,是当今对弈水平评估的公认的权威方法。被广泛用于 国际象棋 、 围棋 、 足球 、 篮球 等运动。网络游戏 英雄联盟 、 魔兽世界 内的竞技对战系统也采用此分级制度。 历史 ELO等级分制度是基于 统计学 的一个评估棋手水平的方法。 美国 国际象棋 协会在1960年首先使用这种计分方法。由于它比先前的方法更公平客观,这种方法很快流行开来。1970年国际棋联正式开始使用等级分制度。 Elo模型原先采用 正态分布 。但是实践显明棋手的表现并非呈正态分布,所以现在的等级分计分系统通常使用的是 Logistic distribution 。 计分方法 假设棋手A和B的当前等级分分别为 和 ,则按Logistic distribution A对B的胜率期望值当为 类似B对A的胜率为 假如一位棋手在比赛中的真实得分 (胜=1分,和=0.5分,负=0分)和他的胜率期望值 不同,则他的等级分要作相应的调整。具体的数学公式为 公式中 和 分别为棋手调整前后的等级分。在大师级比赛中 通常为16。 例如,棋手A等级分为1613,与等级分为1573的棋手B战平。若K取32,则A的胜率期望值为 ,约为0.5573,因而A的新等级分为1613 + 32 · (0.5 − 0.5573) = 1611.166 国际象棋中的等级分 国际象棋中,等级分和棋联称号的大致对应为 2500分以上:国际特级大师 2400-2499分:国际大师 2300-2399分:棋联大师