跳至主要内容

[译]MapReduce 模板,算法,使用案例



原文地址:http://highlyscalable.wordpress.com/2012/02/01/mapreduce-patterns/

MapReduce 模板,算法,使用案例 





Posted on February 1, 2012

In this article I digested a number of MapReduce patterns and algorithms to give a systematic view of the different techniques that can be found on the web or scientific articles. Several practical case studies are also provided. All descriptions and code snippets use the standard Hadoop's MapReduce model with Mappers, Reduces, Combiners, Partitioners, and sorting. This framework is depicted in the figure below.
在文章中,我摘取一些 MapReduce 的模板何算法,对网上和学术论文仲的不同技术给出一个系统的观点,同时也提供一些实际的案例。所有各处的描述和代码片段都是根据标准的 Hadoop 中的 MapReduce 模型完成,其中包括Mappers,Reduces,Combiners, Paritioners,Sorting 模块。框架的描述如下:
MapReduce Framework

Basic MapReduce Patterns

基础的 MapReduce 模板

Counting and Summing

计数器和累加器

Problem Statement: There is a number of documents where each document is a set of terms. It is required to calculate a total number of occurrences of each term in all documents. Alternatively, it can be an arbitrary function of the terms. For instance, there is a log file where each record contains a response time and it is required to calculate an average response time.

问题描述:有一些文档,每个文档有一些词语。现在要求计算每个词语在所有文本中的出现次数。另外,统计方法可以是其他,例如,现在有一个日志文件,文件记录了响应时间,要求计算平均响应时间。

Solution:

Let start with something really simple. The code snippet below shows Mapper that simply emit "1″ for each term it processes and Reducer that goes through the lists of ones and sum them up:

解决:用一些简单的例子演示,下面的代码片段描述:
Mapper: 对每个词语发布"1"
Reduce:遍历"1"的序列并累加起来。

 

01class Mapper
02   method Map(docid id, doc d)
03      for all term t in doc d do
04         Emit(term t, count 1)
05 
06class Reducer
07   method Reduce(term t, counts [c1, c2,...])
08      sum = 0
09      for all count c in [c1, c2,...] do
10          sum = sum + c
11      Emit(term t, count sum)

 

The obvious disadvantage of this approach is a high amount of dummy counters emitted by the Mapper. The Mapper can decrease a number of counters via summing counters for each document:

这里有一个明显的不足,Mapper 会发布很多计数器。因此,Mapper可以通过对自己的计数器进行累加,从而减少发布的计数器的数量。

 

1class Mapper
2   method Map(docid id, doc d)
3      H = new AssociativeArray
4      for all term t in doc d do
5          H{t} = H{t} + 1
6      for all term t in H do
7         Emit(term t, count H{t})

 

In order to accumulate counters not only for one document, but for all documents processed by one Mapper node, it is possible to leverage Combiners:

为了不只是计算一个文档的计数器,可以添加 Combiners 对在同一个 Mapper 节点运行的所有文章组合计数器结果。

 

01class Mapper
02   method Map(docid id, doc d)
03      for all term t in doc d do
04         Emit(term t, count 1)
05 
06class Combiner
07   method Combine(term t, [c1, c2,...])
08      sum = 0
09      for all count c in [c1, c2,...] do
10          sum = sum + c
11      Emit(term t, count sum)
12 
13class Reducer
14   method Reduce(term t, counts [c1, c2,...])
15      sum = 0
16      for all count c in [c1, c2,...] do
17          sum = sum + c
18      Emit(term t, count sum)

 

Applications:

应用:

Log Analysis, Data Querying

日志分析,数据查询

 

Collating

整理

Problem Statement: There is a set of items and some function of one item. It is required to save all items that have the same value of function into one file or perform some other computation that requires all such items to be processed as a group. The most typical example is building of inverted indexes.

问题描述:有一些条目和对条目操作的多个函数。把通过函数获得相同值的条目存进同一个文件,或者执行一些其他的运算来把这些条目组成一组。最典型的例子事建立倒排索引。
Solution:
The solution is straightforward. Mapper computes a given function for each item and emits value of the function as a key and item itself as a value. Reducer obtains all items grouped by function value and process or save them. In case of inverted indexes, items are terms (words) and function is a document ID where the term was found.

解决:解决方案很简单。
Mapper: 计算每个条目的函数,并且用得到的函数值作为key,条目自身作为value。
Reduce:对函数值做 group by 工作,然后继续计算或者存储起来。
在倒排索引中,条目就是词语,函数值就是词语所在的文档ID。


Applications:


Inverted Indexes, ETL
应用:倒排索引,数据抽取

 

Filtering ("Grepping"), Parsing, and Validation

过滤,转换,校验

Problem Statement: There is a set of records and it is required to collect all records that meet some condition or transform each record (independently from other records) into another representation. The later case includes such tasks as text parsing and value extraction, conversion from one format to another.

问题描述:现在有一个记录集合,需要根据一些条件收集记录或者转换记录的表现方式。稍后会有例子进行文本转换和抽取,并转换成另一种格式。

Solution:  Solution is absolutely straightforward – Mapper takes records one by one and emits accepted items or their transformed versions.

解决:
Mapper :逐个记录读取,发布可以通过的条目或者它们的转换版本。

Applications:


Log Analysis, Data Querying, ETL, Data Validation
应用:日志分析,数据查询,收据抽取,数据校验

 

Distributed Task Execution

分布式任务执行


Problem Statement: There is a large computational problem that can be divided into multiple parts and results from all parts can be combined together to obtain a final result.

问题描述:有一个巨大的计算问题,这个问题可以切分成多部分,整合每个部分的答案获得最终答案。

Solution:  Problem description is split in a set of specifications and specifications are stored as input data for Mappers. Each Mapper takes a specification, performs corresponding computations and emits results. Reducer combines all emitted parts into the final result.

解决:问题描述被划分成模块集合,模块给根据输入数据进行排序。
Mapper:每个 Mapper 获得一个模块,执行相应的计算并发布结果。
Reducer:组合所有部分的数据,给出最终结果

Case Study: Simulation of a Digital Communication System

案例学习:模拟数字通信系统

There is a software simulator of a digital communication system like WiMAX that passes some volume of random data through the system model and computes error probability of throughput. Each Mapper runs simulation for specified amount of data which is 1/Nth of the required sampling and emit error rate. Reducer computes average error rate.

有一个像WiMax的软件模拟数字通信系统。通过系统传输大量随即数据并计算吞吐量的错误概率。
Mapper:每个 Mapper 执行模拟器并计算测试数据中 1/N 份的数据,并发布错误率。
Reducer:计算平均出错率。

Applications:


Physical and Engineering Simulations, Numerical Analysis, Performance Testing

应用:物理和工程模拟,数值分析,性能测试

 

Sorting

排序

Problem Statement: There is a set of records and it is required to sort these records by some rule or process these records in a certain order.

问题描述:有一个数据集,要求根据一些规则排序或变成某种序列

Solution: Simple sorting is absolutely straightforward – Mappers just emit all items as values associated with the sorting keys that are assembled as function of items. Nevertheless, in practice sorting is often used in a quite tricky way, that's why it is said to be a heart of MapReduce (and Hadoop). In particular, it is very common to use composite keys to achieve secondary sorting and grouping.

解决:简单的排序很简单。
Mapper:发布所有数据,用经过函数处理的排序键值作为key。
然而,实际排序过程复杂,这是 MapReduce 的核心。特别地,利用组合键值做二级排序和聚合工作是很常见的。


Sorting in MapReduce is originally intended for sorting of the emitted key-value pairs by key, but there exist techniques that leverage Hadoop implementation specifics to achieve sorting by values. See this blog for more details.

MapReduce 的排序原本是打算用于排序发布出去的键值对,但还存在一个技术让 Hadoop 用特别的手段完成基于值的排序。了解更多细节点击下面博客blog

It is worth noting that if MapReduce is used for sorting of the original (not intermediate) data, it is often a good idea to continuously maintain data in sorted state using BigTable concepts. In other words, it can be more efficient to sort data once during insertion than sort them for each MapReduce query.

值得注意的是,如果MapReduce用于排序原始数据(不是中间数据),这个一个非常好的主意来连续维持数据有序的状态,有如 BigTable 的概念。换句话说,在数据插入时立刻进行排序会比每次进行 MapReduce  查询时再排序更有效率。


Applications:


ETL, Data Analysis

应用:数据抽取,数据分析

 

Not-So-Basic MapReduce Patterns

进阶 MapReduce 模板

Iterative Message Passing (Graph Processing)



Problem Statement: There is a network of entities and relationships between them. It is required to calculate a state of each entity on the basis of properties of the other entities in its neighborhood. This state can represent a distance to other nodes,  indication that there is a neighbor with the certain properties, characteristic of neighborhood density and so on.

有一个网络,网络包含一些实体和他们之间的关系。要求在每个实体和附近的其他实体的属性的基础上计算每个实体的状态。这个状态可以是与其他节点的距离,暗示了邻居的某些属性、邻里密度等。

Solution: A network is stored as a set of nodes and each node contains a list of adjacent node IDs. Conceptually, MapReduce jobs are performed in iterative way and at each iteration each node sends messages to its neighbors. Each neighbor updates its state on the basis of the received messages. Iterations are terminated by some condition like fixed maximal number of iterations (say, network diameter) or negligible changes in states between two consecutive iterations. From the technical point of view, Mapper emits messages for each node using ID of the adjacent node as a key. As result, all messages are grouped by the incoming node and reducer is able to recompute state and rewrite node with the new state. This algorithm is shown in the figure below:

解决:一个网络是一个排序好节点集,每个节点包含其邻接点ID的集合。因此,MapReduce 作业迭代执行,每次迭代中,节点发送消息给它的邻接点。每个节点根据获得的数据更新自己的信息(路由器的思想)。迭代可以有多个终结条件,例如,到达最大的迭代次数,或者节点状态在两次迭代间只有微小的改变。从技术的角度出发:
Mapper:发布消息时以邻接点的ID作为key。
Reducer:入度信息被聚集到reduce中,然后可以重新计算节点状态并重写。
算法代码看下面:

 

01class Mapper
02   method Map(id n, object N)
03      Emit(id n, object N)
04      for all id m in N.OutgoingRelations do
05         Emit(id m, message getMessage(N))
06 
07class Reducer
08   method Reduce(id m, [s1, s2,...])
09      M = null
10      messages = []
11      for all s in [s1, s2,...] do
12          if IsObject(s) then
13             M = s
14          else               // s is a message
15             messages.add(s)
16      M.State = calculateState(messages)
17      Emit(id m, item M)

 

It should be emphasized that state of one node rapidly propagates across all the network of network is not too sparse because all nodes that were "infected" by this state start to "infect" all their neighbors. This process is illustrated in the figure below:

值得强调一下,即使一个网络不是太稀疏,一个节点的状态也会迅速传播到整个网络。因为所有节点被当前状态"感染",就会传染他们的邻接点。下面的图例演示这个过程:

Case Study: Availability Propagation Through The Tree of Categories

案例学习:树形目录的可用性传播

Problem Statement: This problem is inspired by real life eCommerce task. There is a tree of categories that branches out from large categories (like Men, Women, Kids) to smaller ones (like Men Jeans or Women Dresses), and eventually to small end-of-line categories (like Men Blue Jeans). End-of-line category is either available (contains products) or not. Some high level category is available if there is at least one available end-of-line category in its subtree. The goal is to calculate availabilities for all categories if availabilities of end-of-line categories are know.


问题描述:这个问题的灵感来自现实生活中的电子商务任务。有一类树的分支从大的类别(如男装,女装,童装),较小的(如男装牛仔裤或妇女礼服),并最终以小类别(如蓝色牛仔裤的男子)结束。结束类是可用(含产品)或不可用的。一些高层次的类是可用的,如果有至少一个可用的在其子类。我们的目标是计算所有类别的可用性,假设结束类的可用性事知道的。


Solution: This problem can be solved using the framework that was described in the previous section. We define getMessage and calculateState methods as follows:

解决:这个问题可以用前面章节提到的框架解决。下面定义了getMessage和calclateState方法:

 

1class N
2   State in {True = 2, False = 1, null = 0}, initialized 1 or 2 for end-of-line categories, 0 otherwise
3 
4method getMessage(object N)
5   return N.State
6 
7method calculateState(state s, data [d1, d2,...])
8   return max( [d1, d2,...] )

 

Case Study: Breadth-First Search

案例学习:广度有限搜索

Problem Statement: There is a graph and it is required to calculate distance (a number of hops) from one source node to all other nodes in the graph.

问题描述:计算图上源节点到其他所有点的距离。

Solution: Source node emits 0 to all its neighbors and these neighbors propagate this counter incrementing it by 1 during each hope:

解决:源节点发布消息给它的邻接点,这些节点再利用计数器进行+1传播。(如果节点之间的关系带有权值,可以使用提到的迭代状态更新进行,利用松弛原理)

 

1class N
2   State is distance, initialized 0 for source node, INFINITY for all other nodes
3 
4method getMessage(N)
5   return N.State + 1
6 
7method calculateState(state s, data [d1, d2,...])
8   min( [d1, d2,...] )

 

Case Study: PageRank and Mapper-Side Data Aggregation

案例学习:PageRank 和 Mapper-Side 数据聚类

This algorithm was suggested by Google to calculate relevance of a web page as a function of authoritativeness (PageRank) of pages that have links to this page. The real algorithm is quite complex, but in its core it is just a propagation of weights between nodes where each node calculates its weight as a mean of the incoming weights:

该算法是由google提供,计算一个网页与其他有权威性的网页的关联值。真正的算法相当复杂,但它的核心计算节点之间的传播,每个节点的关联值可以由入度权值计算出来:


 

1class N
2    State is PageRank
3 
4method getMessage(object N)
5    return N.State / N.OutgoingRelations.size()
6 
7method calculateState(state s, data [d1, d2,...])
8    return ( sum([d1, d2,...]) )

 

It is worth mentioning that the schema we use is too generic and doesn't take advantage of the fact that state is a numerical value. In most of practical cases, we can perform aggregation of values on the Mapper side due to virtue of this fact. This optimization  is illustrated in the code snippet below (for the PageRank algorithm):

值得提到的事,我们使用的这个模型太过一般,没有利用好状态是数值的特点。大多书实际情况下,因为数值状态的有点,我们首先在 Mapper 对数据进行聚合。下面描述了这个算法的优化(作用于PageRank算法):

 

01class Mapper
02   method Initialize
03      H = new AssociativeArray
04   method Map(id n, object N)
05      p = N.PageRank  / N.OutgoingRelations.size()
06      Emit(id n, object N)
07      for all id m in N.OutgoingRelations do
08         H{m} = H{m} + p
09   method Close
10      for all id n in H do
11         Emit(id n, value H{n})
12 
13class Reducer
14   method Reduce(id m, [s1, s2,...])
15      M = null
16      p = 0
17      for all s in [s1, s2,...] do
18          if IsObject(s) then
19             M = s
20          else
21             p = p + s
22      M.PageRank = p
23      Emit(id m, item M)

 

Applications:


Graph Analysis, Web Indexing

应用:图论分析,网页索引

 

Distinct Values (Unique Items Counting)

唯一值

Problem Statement: There is a set of records that contain fields F and G. Count the total number of unique values of filed F for each subset of records that have the same G (grouped by G).

问题描述:有一个记录集合,内容包括F和G。对于拥有相同G值的子集,计算不同于F的元素个数。


The problem can be a little bit generalized and formulated in terms of faceted search:

这个问题可以根据分类搜索更广义和规范概括:

Problem Statement: There is a set of records. Each record has field F and arbitrary number of category labels G = {G1, G2, …} . Count the total number of unique values of filed F for each subset of records for each value of any label. Example:

问题描述,:有一个记录集。每个记录有一个F域和随机的标签G = {G1, G2, …}。对每个相同的标签,计算不同F的个数。例如:

 

01Record 1: F=1, G={a, b}
02Record 2: F=2, G={a, d, e}
03Record 3: F=1, G={b}
04Record 4: F=3, G={a, b}
05 
06Result:
07a -> 3   // F=1, F=2, F=3
08b -> 2   // F=1, F=3
09d -> 1   // F=2
10e -> 1   // F=2

 

Solution I:
The first approach is to solve the problem in two stages. At the first stage Mapper emits dummy counters for each pair of F and G; Reducer calculates a total number of occurrences for each such pair. The main goal of this phase is to guarantee uniqueness of F values. At the second phase pairs are grouped by G and the total number of items in each group is calculated.

解决1:第一个方法是分两个阶段解题。
第1阶段:Mapper 为每组(F,G)发布一次
                    Reducer 计算所有(F,G)对的数量。
                    这个阶段是为了保证F的唯一。
第2阶段:根据G聚集(F,G)对,并对每组数据进行累计。
这种做法使用重复度极大的数据集,如果重复度不大,可以直接计算
Phase I:

 

1class Mapper
2   method Map(null, record [value f, categories [g1, g2,...]])
3      for all category g in [g1, g2,...]
4         Emit(record [g, f], count 1)
5 
6class Reducer
7   method Reduce(record [g, f], counts [n1, n2, ...])
8      Emit(record [g, f], null )

 

Phase II:

 

1class Mapper
2   method Map(record [f, g], null)
3      Emit(value g, count 1)
4 
5class Reducer
6   method Reduce(value g, counts [n1, n2,...])
7      Emit(value g, sum( [n1, n2,...] ) )

 

Solution II:
The second solution requires only one MapReduce job, but it is not really scalable and its applicability is limited. The algorithm is simple – Mapper emits values and categories, Reducer excludes duplicates from the list of categories for each value and increment counters for each category. The final step is to sum all counter emitted by Reducer. This approach is applicable if th number of record with the same f value is not very high and total number of categories is also limited. For instance, this approach is applicable for processing of web logs and classification of users – total number of users is high, but number of events for one user is limited, as well as a number of categories to classify by. It worth noting that Combiners can be used in this schema to exclude duplicates from category lists before data will be transmitted to Reducer.

解决2:第二种解决方案使用一个 MapReduce 作业,但它没有扩展性,它的适应能力有限。这个算法很简单。
Mapper:发布数据值和标签
Reducer:对标签排重,并用对每个标签计数器计数。
最后一步是对Reduce的计数器进行求和。这个方法适用于记录中相同的F值比多,并且标签数量游戏。例如,这个方法适用于计算网站日志和用户分类(用户数量很大,但用户时间有限,并且用标签进行分类)。值得注意的是,这个模型可以在数据发送到 Reducer 前使用 Combiners 来对标签列表去重。
01class Mapper
02   method Map(null, record [value f, categories [g1, g2,...] )
03      for all category g in [g1, g2,...]
04          Emit(value f, category g)
05 
06class Reducer
07   method Initialize
08      H = new AssociativeArray : category -> count
09   method Reduce(value f, categories [g1, g2,...])
10      [g1', g2',..] = ExcludeDuplicates( [g1, g2,..] )
11      for all category g in [g1', g2',...]
12         H{g} = H{g} + 1
13   method Close
14      for all category g in H do
15         Emit(category g, count H{g})

 

Applications:


Log Analysis, Unique Users Counting
应用:日志分析,唯一用户计数

 

Cross-Correlation

交叉相关分析
Problem Statement: There is a set of tuples of items. For each possible pair of items calculate a number of tuple s where these items co-occur. If the total number of items is N then N*N values should be reported.
问题描述:有一些条目集合。对于每组条目,计算共同出现的元素(交集)。如果条目数量是N,那么需要报告N*N个值。

This problem appears in text analysis (say, items are words and tuples are sentences), market analysis (customers who buy this tend to also buy that). If N*N is quite small and such a matrix can fit in the memory of a single machine, then implementation is straightforward.

这个问题出现在文本分析(也就是说,条目是词语 ,元组是句子),市场分析(客户买了这个物品又准备买那个物品)。如果N*N太小,这个矩阵可以用一台机器存储,那么实现很简单。

Pairs Approach

配对方法

The first approach is to emit all pairs and dummy counters from Mappers and sum these counters on Reducer. The shortcomings are:

第一个方法,Mapper 发布所有配对值和计数器,Reducer 求和:

  • The benefit from combiners is limited, as it is likely that all pair are distinct
  • 得益于有限的 combiners,所有配对都是没有重复。
  • There is no in-memory accumulations
  • 没有内存积聚

 

01class Mapper
02   method Map(null, items [i1, i2,...] )
03      for all item i in [i1, i2,...]
04         for all item j in [i1, i2,...]
05            Emit(pair [i j], count 1)
06 
07class Reducer
08   method Reduce(pair [i j], counts [c1, c2,...])
09      s = sum([c1, c2,...])
10      Emit(pair[i j], count s)

 

Stripes Approach

数组累计方法

The second approach is to group data by the first item in pair and maintain an associative array ("stripe") where counters for all adjacent items are accumulated. Reducer receives all stripes for leading item i, merges them, and emits the same result as in the Pairs approach.

第二个方法是用配对的第一个值作为聚合的键,维护一个数组,数组用于累计邻接节点的数量。Reducer 接收所有以i为键的数组,归并他们的值,并发布于配对方法相同的结果。

  • Generates fewer intermediate keys. Hence the framework has less sorting to do.
  • Greately benefits from combiners.
  • Performs in-memory accumulation. This can lead to problems, if not properly implemented.
  • More complex implementation.
  • In general, "stripes" is faster than "pairs"
  • 产生更少的中间键值。因此计算过程中进行更少的排序工作。
  • 能利用好 combiners。
  • 用内存惊醒累计,如果没有注意细节,会引起问题。
  • 更加复杂的实现。
  • 一般来说,数组累计方法比配对方法快。

 

01class Mapper
02   method Map(null, items [i1, i2,...] )
03      for all item i in [i1, i2,...]
04         H = new AssociativeArray : item -> counter
05         for all item j in [i1, i2,...]
06            H{j} = H{j} + 1
07         Emit(item i, stripe H)
08 
09class Reducer
10   method Reduce(item i, stripes [H1, H2,...])
11      H = new AssociativeArray : item -> counter
12      H = merge-sum( [H1, H2,...] )
13      for all item j in H.keys()
14         Emit(pair [i j], H{j})

 

Applications:


Text Analysis, Market Analysis

应用:文本分析,市场分析

References:

参考:
  1. Lin J. Dyer C. Hirst G. Data Intensive Processing MapReduce

 

Relational MapReduce Patterns


关系 MapReduce 模板


In this section we go though the main relational operators and discuss how these operators can implemented in MapReduce terms.

在这个章节,我们讲述主要的关系运算和讨论怎么用 MapReduce 实现这些操作。

Selection

选择

 

1class Mapper
2   method Map(rowkey key, tuple t)
3      if t satisfies the predicate
4         Emit(tuple t, null)

 

Projection

映射
Projection is just a little bit more complex than selection, but we should use a Reducer in this case to eliminate possible duplicates.

映射比选择麻烦一点,但我们可以使用Reducer排除重复内容。

1class Mapper
2   method Map(rowkey key, tuple t)
3      tuple g = project(t)  // extract required fields to tuple g
4      Emit(tuple g, null)
5 
6class Reducer
7   method Reduce(tuple t, array n)   // n is an array of nulls
8      Emit(tuple t, null)

 

Union

并集
Mappers are fed by all records of two sets to be united. Reducer is used to eliminate duplicates.
 向Mapper传入两个集合的所有记录。Reducer用于排重
1class Mapper
2   method Map(rowkey key, tuple t)
3      Emit(tuple t, null)
4 
5class Reducer
6   method Reduce(tuple t, array n)   // n is an array of one or two nulls
7      Emit(tuple t, null)

 

Intersection

交集

Mappers are fed by all records of two sets to be intersected. Reducer emits only records that occurred twice. It is possible only if both sets contain this record because record includes primary key and can occur in one set only once.

Mapper:输入两个集合的所有记录。
Reducer:只发布出现2次的记录。这种结果只有两个集合都存在这条记录,因为记录包含主键,且只能在一个集合中出现一次。

1class Mapper
2   method Map(rowkey key, tuple t)
3      Emit(tuple t, null)
4 
5class Reducer
6   method Reduce(tuple t, array n)   // n is an array of one or two nulls
7      if n.size() = 2
8          Emit(tuple t, null)

 

Difference

不同

Let's we have two sets of records – R and S. We want to compute difference R – S. Mapper emits all tuples and tag which is a name of the set this record came from. Reducer emits only records that came from R but not from S.

现在有两个集合R和S。我们想计算R-S。
Mapper:发布所有的数据和标签,标签标示记录来自哪个集合。
Reducer:只发布来自R而不是S的记录。

1class Mapper
2   method Map(rowkey key, tuple t)
3      Emit(tuple t, string t.SetName)    // t.SetName is either 'R' or 'S'
4 
5class Reducer
6   method Reduce(tuple t, array n) // array n can be ['R'], ['S'], ['R' 'S'], or ['S', 'R']
7      if n.size() = 1 and n[1] = 'R'
8          Emit(tuple t, null)

 

GroupBy and Aggregation

组织和聚合
Grouping and aggregation can be performed in one MapReduce job as follows. Mapper extract from each tuple values to group by and aggregate and emits them. Reducer receives values to be aggregated already grouped and calculates an aggregation function. Typical aggregation functions like sum or max can be calculated in a streaming fashion, hence don't require to handle all values simultaneously. Nevertheless, in some cases two phase MapReduce job may be required – see pattern Distinct Values as an example.
 组织和聚合可以用以下的一个MapReduce执行。
Mapper:取得数组里面每个值组合和发布。
Reducer:接收被组合的信息,并使用聚合方法处理。
典型的聚合方法像sum或者max可以用流处理的方式计算出来,因此不需要同时拥有所有数据。然而,在一些场景可能需要两阶段MapReduce进行,可以看"唯一值"小节。

1class Mapper
2   method Map(null, tuple [value GroupBy, value AggregateBy, value ...])
3      Emit(value GroupBy, value AggregateBy)
4class Reducer
5   method Reduce(value GroupBy, [v1, v2,...])
6      Emit(value GroupBy, aggregate( [v1, v2,...] ) )  // aggregate() : sum(), max(),...

 

 

Joining

连接

Joins are perfectly possible in MapReduce framework, but there exist a number of techniques that differ in efficiency and data volumes they are oriented for. In this section we study some basic approaches. The references section contains links to detailed studies of join techniques.

在MapReduce框架做连接是完全可行的,但是,这存在一个从效率方面考虑和从数据容量方面考虑的技术分歧。在这一节,我们学习一些基本的方法。在参考小节包含一个详细说明连接技术的链接。

Repartition Join (Reduce Join, Sort-Merge Join)

重分配连接(Reduce连接,排序归并连接)

This algorithm joins of two sets R and L on some key k. Mapper goes through all tuples from R and L, extracts key k from the tuples, marks tuple with a tag that indicates a set this tuple came from ('R' or 'L'), and emits tagged tuple using k as a key. Reducer receives all tuples for a particular key k and put them into two buckets – for R and for L. When two buckets are filled, Reducer runs nested loop over them and emits a cross join of the buckets. Each emitted tuple is a concatenation R-tuple, L-tuple, and key k. This approach has the following disadvantages:

这个算法做关于R和L的k值得连接。
Mapper: 遍历所有R和L的元素,提取k值并标记k值来自哪个集合(R或者L),并用k作为键发布元素。
Reducer:接收所用以k为键值的元素,并把它们分类到R集合和L集合内。完成分类后,用两重循环遍历它们做笛卡尔积。最后发布关于k的R/L键值对。这个方法有以下缺点:
  • Mapper emits absolutely all data, even for keys that occur only in one set and have no pair in the other.
  • Mapper 发布所有数据,即使那些只出现在一个集合但不出现在另一个集合的键值。
  • Reducer should hold all data for one key in the memory. If data doesn't fit the memory, its Reducer's responsibility to handle this by some kind of swap.
  • Reducer需要在内存中保存关于一个键值的所有数据。如果数据比内存还大,那么Reduecer需要进行一些内存换页的操作。
Nevertheless, Repartition Join is a most generic technique that can be successfully used when other optimized techniques are not applicable.
 然而,当没有其他更加优化的技术可以使用的时,重分配连接时最普遍被使用的技术。
01class Mapper
02   method Map(null, tuple [join_key k, value v1, value v2,...])
03      Emit(join_key k, tagged_tuple [set_name tag, values [v1, v2, ...] ] )
04 
05class Reducer
06   method Reduce(join_key k, tagged_tuples [t1, t2,...])
07      H = new AssociativeArray : set_name -> values
08      for all tagged_tuple t in [t1, t2,...]     // separate values into 2 arrays
09         H{t.tag}.add(t.values)
10      for all values r in H{'R'}                 // produce a cross-join of the two arrays
11         for all values l in H{'L'}
12            Emit(null, [k r l] )

 

Replicated Join (Map Join, Hash Join)

复制连接(Map连接,哈希连接)

In practice, it is typical to join a small set with a large one (say, a list of users with a list of log records). Let's assume that we join two sets – R and L, R is relative small. If so, R can be distributed to all Mappers and each Mapper can load it and index by the join key. The most common and efficient indexing technique here is a hash table. After this, Mapper goes through tuples of the set L and joins them with the corresponding tuples from R that are stored in the hash table. This approach is very effective because there is no need in sorting or transmission of the set L over the network, but set R should be quite small to be distributed to the all Mappers.

实际上,有一些典型的示例是需要连接一个小的和一个大的数据集(也就是说,一个用户列表和日志记录)。假设我们要连接R和L两个集合,R集合是小的。如果是这样,R可以分布到所有所有Mapper,并求每个Mapper可以读取它和对它的所有键值做索引。最普遍和有效的索引技术室建立哈希表。在这之后,Mapper取出L集合关于某个键值的元素,并将这个元素和存放在哈希表的的R集合进行连接。这个方法是非常有效率的,因为不需要对L集合在网络上进行排序和传递,但是R集合只能很小才能分布到所有Mapper。
 
01class Mapper
02   method Initialize
03      H = new AssociativeArray : join_key -> tuple from R
04      R = loadR()
05      for all [ join_key k, tuple [r1, r2,...] ] in R
06         H{k} = H{k}.append( [r1, r2,...] )
07 
08   method Map(join_key k, tuple l)
09      for all tuple r in H{k}
10         Emit(null, tuple [k r l] )

 

References:

参考:
  1. Join Algorithms using Map/Reduce
  2. Optimizing Joins in a MapReduce Environment




评论

此博客中的热门博文

Spark 矩阵相乘实现

矩阵相乘计算的意义十分重要,比如我们做数据分析经常使用到的join操作就可以理解成为矩阵相乘的一个部分,矩阵相乘在很多分布式计算框架都有自己的实现,这些实现也可以根据不同大小的矩阵做不同的优化,比如在MapReduce上就有两种基本的实现: 大矩阵乘小矩阵 我可以把小矩阵放到DistrubutedCache,让每个mapper读取,这就是hive的MapJoin的实现。 两个大矩阵相乘 相乘中两个矩阵都是超大矩阵的话,为了减少MapReduce过程中产生的巨大数据,使用的带宽。都会采用矩阵分块的做法,以下会详细介绍这种实现方法。 Spark介绍 官网介绍 超大矩阵拆分计算方法 单机(单线程)矩阵相乘 假设有矩阵 A , B ,相乘的结果为 C ,那么相乘的伪代码: C = 0 for(i <- 0 to A.lenght) for(k <- 0 to A[i].lenght) if(A[i][k]!=0) for(j <- B[k].lenght) { C[i][j] += A[i][k] * B[k][j] } 单机的计算中,时间复杂度是 O(n^3),如果 n 增长到一定程度的时候,那么计算用时就会非常大,所以下面会说一下并行计算方法。 并行矩阵相乘 在计算的过程中,我们很容易发现每个 A[i][k] * B[k][j] 都可以单独计算,因此我也可以单独计算这些组合,最后做一个累计就可以获得 C 矩阵了,相乘伪代码如下: A = ((row, column), value) => (column, (row, value)) B = ((row, column), value) => (row, (column, value)) Temp = A.join(B) => (k, (rowA, valueA), (columnB, valueB)) => ((rowA, columnB), valueA * valueB) => ((rowC, columnC), valueC) C = Temp.reduceByKey =...

iphone 自动打包脚本

最近做ios开发,经常需要给老大打ipa包,这个虽然在xcode中编译并打包是很简单的事,不过每次都得花几分钟的时间做一些手动的放入Payload并压缩成zip包的操作。比较麻烦的是,在开发过程中,突然就说要一个可以执行的包做测试。那么,思路断了,正在写的代码要注释掉,这样持续下去浪费的时间会很多,所以还是需要写一个打包脚本。 打包具体用到的命令是这些: xcodebuild: 主要用于编译项目 xcrun: 主要用于打ipa包 具体打包流程就是编译,然后打一个发布包,一个ipa包,其实用脚本来说话就好了。另外,我用一个conf.dat来存放target和configuration,这些都在xcode里面指定好了,用xxx:xxx这样的格式来存放,xcodebuild在编译的时候会自动找到对应的配置。 打包脚本如下: #!/bin/sh basePath=`pwd` distDir="target" distDir="${basePath}/${distDir}" rm -rdf "$distDir" mkdir -p "$distDir" baseName="xxx" #.app 的名字 projectDir=$(cd ../mobile/xxx; pwd) # 进入xcode工程目录 cd $projectDir for line in $(cat ${basePath}/conf.dat) do targetName=`echo $line | cut -f1 -d':'` conf=`echo $line | cut -f2 -d':'` releaseDir="${projectDir}/build/${conf}-iphoneos" rm -rdf "$releaseDir" echo "======build ${baseName}.app start..." echo "======clean ${conf}..." xcodebuild clean -configuration "$...

【转】ELO-对弈与排名(有触感的排名)

原文: https://www.cnblogs.com/leoin2012/p/4854442.html ELO介绍 ELO等级分制度 是指由 匈牙利 裔 美国 物理学家 Elo创建的一个衡量各类对弈活动水平的评价方法,是当今对弈水平评估的公认的权威方法。被广泛用于 国际象棋 、 围棋 、 足球 、 篮球 等运动。网络游戏 英雄联盟 、 魔兽世界 内的竞技对战系统也采用此分级制度。 历史 ELO等级分制度是基于 统计学 的一个评估棋手水平的方法。 美国 国际象棋 协会在1960年首先使用这种计分方法。由于它比先前的方法更公平客观,这种方法很快流行开来。1970年国际棋联正式开始使用等级分制度。 Elo模型原先采用 正态分布 。但是实践显明棋手的表现并非呈正态分布,所以现在的等级分计分系统通常使用的是 Logistic distribution 。 计分方法 假设棋手A和B的当前等级分分别为 和 ,则按Logistic distribution A对B的胜率期望值当为 类似B对A的胜率为 假如一位棋手在比赛中的真实得分 (胜=1分,和=0.5分,负=0分)和他的胜率期望值 不同,则他的等级分要作相应的调整。具体的数学公式为 公式中 和 分别为棋手调整前后的等级分。在大师级比赛中 通常为16。 例如,棋手A等级分为1613,与等级分为1573的棋手B战平。若K取32,则A的胜率期望值为 ,约为0.5573,因而A的新等级分为1613 + 32 · (0.5 − 0.5573) = 1611.166 国际象棋中的等级分 国际象棋中,等级分和棋联称号的大致对应为 2500分以上:国际特级大师 2400-2499分:国际大师 2300-2399分:棋联大师