跳至主要内容

博文

目前显示的是 2013的博文

Spark 执行调优——存储控制

使用Spark确实能非常大地提高我的工作效率,无论从使用的简易性和执行的速度来看,都比我用MapReduce写算法要快得多。在使用了将近3个月的时间,我还是坚定不移地选择使用Spark来取代我以后的数据分析和算法编写工作。 以下,就写一下使用Spark时自己的一些经验好了,没有使用Spark经验的同学可能比较难懂。 1. 关于Shuffle 现在我使用Spark写了一个wordcount: val text = spark.textFile(/path/...) val counts = text.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_) counts.saveAsTextFile(/path/...) 那么在这个过程中,DAGSheduler为我们产生了多少个Stage呢?让我们看看Stage拆分图: 从图中可以看出,reduceByKey是导致Stage拆分的主因,写过MapReduce的人都会知道,这个操作需要把相同key的数据全部聚集到一个机器才能接着进行计算,这个过程就是Shuffle。从这里可以看出,Spark的DAGScheduler会根据Shuffle来划分多个Stage。 2. 关于数据存储 2.1 存储模块 都说Spark是基于内存的分布式计算框架,号称计算过程数据完全不用落地,减少文件IO,那么这个又是怎么做的呢? BlockManager: Spark会利用一个叫BlockManager的模块做数据存储控制,这个存储包括内存和磁盘存储。 MapOutputTracker:用于记录Stage的每个分块的存储位置。 配合上面这两个工具,我们可以知道每个Stage的存储都是分布在各个Executor(工作机)中。实际上,当一个Stage在各个机器上计算好自己的分块时,它们的结果就会直接保存在所在机器中,而通常存储的结果就是Stage最后一个RDD的结果。 2.2 cache & persist 2.2.1 cache 我们可以很简单地调用cache: rdd.cache() 就可以告诉Spark我需要把这个 rdd 存到内存中,比如这个 rdd 不是Stage的最后一个RDD,S...

Spark 矩阵相乘实现

矩阵相乘计算的意义十分重要,比如我们做数据分析经常使用到的join操作就可以理解成为矩阵相乘的一个部分,矩阵相乘在很多分布式计算框架都有自己的实现,这些实现也可以根据不同大小的矩阵做不同的优化,比如在MapReduce上就有两种基本的实现: 大矩阵乘小矩阵 我可以把小矩阵放到DistrubutedCache,让每个mapper读取,这就是hive的MapJoin的实现。 两个大矩阵相乘 相乘中两个矩阵都是超大矩阵的话,为了减少MapReduce过程中产生的巨大数据,使用的带宽。都会采用矩阵分块的做法,以下会详细介绍这种实现方法。 Spark介绍 官网介绍 超大矩阵拆分计算方法 单机(单线程)矩阵相乘 假设有矩阵 A , B ,相乘的结果为 C ,那么相乘的伪代码: C = 0 for(i <- 0 to A.lenght) for(k <- 0 to A[i].lenght) if(A[i][k]!=0) for(j <- B[k].lenght) { C[i][j] += A[i][k] * B[k][j] } 单机的计算中,时间复杂度是 O(n^3),如果 n 增长到一定程度的时候,那么计算用时就会非常大,所以下面会说一下并行计算方法。 并行矩阵相乘 在计算的过程中,我们很容易发现每个 A[i][k] * B[k][j] 都可以单独计算,因此我也可以单独计算这些组合,最后做一个累计就可以获得 C 矩阵了,相乘伪代码如下: A = ((row, column), value) => (column, (row, value)) B = ((row, column), value) => (row, (column, value)) Temp = A.join(B) => (k, (rowA, valueA), (columnB, valueB)) => ((rowA, columnB), valueA * valueB) => ((rowC, columnC), valueC) C = Temp.reduceByKey =...

Spark内存溢出调试笔记

最近在玩 Spark ,它是个基于内存的分布式计算框架,用起来还是挺方便的,而且相当适合需要迭代计算的算法,比如PageRank和机器学习的算法。Spark的社区相当活跃,每天都有很多user提问,每天的commit数也是超过1000的。 以前Spark利用mesos集群或者用自己启动的集群来进行分布式计算。由于国内很多公司只维护了hadoop集群,我没能看到Spark有投入到生产中(要维护多一个mesos集群成本可是很高的)。然而,在0.6的版本中加入了 on Yarn 模块。这使得很多部署了hadoop集群的公司(升级成支持Yarn的hadoop)可以轻易的使用Spark了。 要好好玩Spark还是需要一点时间的,因为现在的版本还是有不少bug和todo的。最近玩的时候还是遇到了个比较让人头大的问题的,当我迭代调用collect方法的时候,会出现一个让人比较头大的问题: Master无缘无故就跑的很慢 使得akka超时 Yarn的ResourcesManager告诉我Master用的内存超出预算 出现这个问题之后,我还是真不知道怎么定位原因,现在解决了之后,感觉自己的编程经验还真是太少了。 print GC 首先,我PrintGC的信息,在启动参数中加入下面的参数: -verbose:gc: 输出GC信息 -XX:+PrintGCTimeStamps:输出GC时间 -XX:+PrintGCDetails:输出GC详细信息,比如新生代GC情况,永久代GC情况,内存的GC汇总等 在这里,我发现Master崩溃前会不断Full GC,并且Full GC后,老年代的内存用量没有降下来,这只可能是用的内存真的需要这么多(给了10G的内存还崩溃真说不过去),或者内存泄露。 dump 内存 这里说的dump就是抽取jvm内存使用状况的快照。启动参数里面还是有不少关于dump的参数的: -XX:-HeapDumpOnOutOfMemoryError:在OOM时,输出一个dump文件,记录当时的内存快照 -XX:HeapDumpPath=/tmp/dump.hprof:把dump信息输出到/tmp/dump.hprof中 dump文件的大小通常是jvm占用内存的大小,所以文件可能会很大。 分析...

[转]Java 6 JVM参数选项大全(中文版)

作者: Ken Wu Email: ken.wug@gmail.com 转载本文档请注明原文链接  http://kenwublog.com/docs/java6-jvm-options-chinese-edition.htm ! 本文 是基于最新的 SUN官方 文档 Java SE 6 Hotspot VM Options   编写的译文。主要介绍 JVM 中的非稳态选项及其使用说明。 为了让读者明白每个选项的含义,作者在原文基础上 补充了大量的资料 。希望这份文档,对正在研究 JVM 参数的朋友有帮助! 另外,考虑到本文档是初稿,如有描述错误,敬请指正。 非稳态 选项使用说明 -XX:+<option>  启用选项 -XX:-<option>  不启用选项 -XX:<option>=<number>  给选项设置一个数字类型值,可跟单位,例如  32k, 1024m, 2g -XX:<option>=<string>  给选项设置一个字符串值,例如 -XX:HeapDumpPath=./dump.core 行为 选项 选项 默认值与限制 描述 -XX:-AllowUserSignalHandlers 限于 Linux 和 Solaris ,默认不启用 允许为 java 进程安装信号处理器。 Java 信号处理相关知识,详见  http://kenwublog.com/java-asynchronous-notify-based-on-signal -XX:-DisableExplicitGC 默认不启用 禁止在运行期显式地调用  System.gc() 。 开启该选项后, GC 的触发时机将由 Garbage Collector 全权掌控。 注意:你熟悉的代码里没调用 System.gc() ,不代表你依赖的框架工具没在使用。 例如 RMI 就在多数用户毫不知情的情况下,显示地调用 GC 来防止自身 OOM 。 请仔细权衡禁用带来的影响。 -XX:-RelaxAccessCont...