Google MapReduce 论文阅读笔记

原论文

Dean, Jeffrey, and Sanjay Ghemawat. “MapReduce: simplified data processing on large clusters.” Communications of the ACM 51.1 (2008): 107-113.

前言

Google 三驾马车之一,MIT 6.824 first day 的 preparation task。

Abstract

MapReduce is a programming model and an associated implementation for processing and generating large data sets. Users specify a map function that processes a key/value pair to generate a set of intermediate key/value pairs, and a reduce function that merges all intermediate values associated with the same intermediate key.

MapReduce 通过 Map 函数对一个基于 k-v 对的数据集进行处理,生成对应的中间数据集,再通过 Reduce 函数对这些中间数据集中具有相同的 key 的 value 进行合并。

关注点

The run-time system takes care of the details of partitioning the input data, scheduling the program’s execution across a set of machines, handling machine failures, and managing the required inter-machine communication.

  • 如何分割输入数据。
  • 分布式集群的调度。
  • 机器的错误处理。
  • 集群机器间的通信。

解决问题

The input data is usually large and the computations have to be distributed across hundreds or thousands of machines in order to finish in a reasonable amount of time. The issues of how to parallelize the computation, distribute the data, and handlefailures conspire to obscure the original simple computation with large amounts of complex code to deal with these issues.

输入的数据量巨大,难以在单机下完成,如果想在可接受的时间内完成,需要将计算分给成百上千的机器上。

Programming Model

MapReduce 的模型原理是:对 input key/value pairs 对进行处理,生成对应的 output key/value pairs,这两步通过 Map 函数和 Reduce 函数来完成。

  • Map:由用户编写,接受一个 input key/value pair ,生成一个 intermediate key/value pairs 的集合,MapReduce Libray 将所有具有相同 intermediate key 的 value 集合一起后传递给 Reduce 函数。
  • Reduce:由用户编写,接受一个 intermediate key 和一个对应这个 key 的 value 集合,Reduce 函数会合并这些 value,生成一个较小的 value 集合。一般而言每次 Reduce 只产生 0 个或 1 个 output value。通常会使用迭代器将 intermediate value 传递给 Reduce 函数,这样就可以处理一些大到无法全部放入内存的 value 集合了。

Example

例如,统计一个很大的文档集合中每个单词的出现次数,伪代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
map(String key, String value):
// key: document name
// value: document contents
for each word w in value:
EmitIntermediate(w, "1");

reduce(String key, Iterator values):
// key: a word
// values: a list of counts
int result = 0;
for each v in values:
result += ParseInt(v);
Emit(AsString(result));

map( ) 会输出文档中的每个词以及这个词的出现次数,在上面的例子中出现次数就是 1 。

reduce( ) 则把 map( ) 生成的每个词的出现次数进行累加。

Type

尽上述伪代码中的输入与输出都是 String,但是 map() 和 reduce() 的输入属于是有具体类型的:

1
2
map (k1,v1) → list(k2,v2)
reduce (k2,list(v2)) → list(v2)

More Examples

更多 MapReduce 的例子如下:

  • Distributed Grep:Map 函数输出符合匹配规则的一行,Reduce 函数将中间数据复制到输出中。
  • Count of URL Access Frequency:Map 函数处理网页的访问日志,输出 (URL,1),Reduce 函数将相同 URL 的 value 进行累加,得到 (URL,total count)。
  • Reverse Web-Link Graph
  • Term-Vector per Host
  • Inverted Index
  • Distributed Sort

Implementation

ExecutionOverview

执行流程如下:

  1. 将 input files 分割成 M 片,每个数据片一般为 16MB ~ 64MB,接着会在集群中创建多个程序副本。
  2. 这些程序副本中有一个特殊的 master,其他的为 worker,master 会将任务分配给 worker。一共有 M 个 map 任务或 R 个 reduce 任务,master 会将一个 map 或 reduce 任务分配给一个闲置的 worker。
  3. 被分配了 map 任务的 worker 会读入对应的 input files split 内容,解析出 key/value paris ,然后将其传给用户定义的 Map 函数,由 map 函数生成的 intermediate key/value pairs 会被缓存至内存中。
  4. 内存中缓存的 key/value pairs 会被 partitioning function 分成 R 个 regions ,并被周期性地写入到本地磁盘上。这些 key/value pairs 在磁盘上的存储位置会被回传给 master,然后由 master 负责将存储位置传给 reduce worker。
  5. Reduce worker 收到 master 发来的数据存储位置后,通过 RPC 从 map worker 所在主机的磁盘上读取这些缓存数据。当 reduce worker 读取了所有的 intermediate key/value pairs 后,会对 key 进行排序令所有具有相同 key 的数据聚集在一起。要进行排序的原因是,通常而言许多不同 key 的数据会被映射到同一 reduce 任务上。如果数据过大无法在内存中完成排序,则需要进行外部排序。
  6. Reduce worker 会遍历排序后的中间数据,对于每一个唯一的 intermediate key,redcuer worker 会将这个 key 和对应的 value 集合传给用户定义的 Reduce 函数。Reduce 函数的输出结果会被追加到这个 reduce 分区中的一个最终 output file。
  7. 当所有的 map 和 reduce 任务都完成后,master 会唤醒用户程序,此时,用户程序对 MapReduce 的调用才会返回。

当成功完成调用后,MapReduce 的输出结果会被存放至 R 个 output files 中(每个 Reduce 对应一个 files)。通常而言,用户不需要将这 R 个 output files 进行合并,因为用户经常会将这些文件再作为其他 MapReduce 的输入。

Master Data Structures

存储了每个 Map 和 Reduce 任务的状态(idle, in-progress, completed),以及 woker 机器的标识。

Master 就像一个 conduit,将 intermediate file 的存储位置从 Map 传递给 Reduce。对于每一个完成的 Map 任务,master 存储了其产生的 R 个 intermediate file 的大小与位置。当 Map 任务完成时,master 会接收到代大小和位置的更新信息,这些信息会被以增量推送的形式 push 给处于 in-progress 状态的 worker。

Fault Tolerance

Worker Failure

Master 会周期性地对每个 worker 发送 ping 命令。如果在一个时间范围内 master 没有收到来自 worker 的回复,master 就会将这个 worker 标记为失效状态(failed)。这个失效的 worker 的完成的所有 Map 任务都将被重设为初始的 idle 状态,然后这些任务就可以被分配给其他的 worker,Reduce 任务也是同样的。

注意,当 worker 故障时,由于 Map 的输出存储在机器的本地磁盘上,所以必须重新执行,而 Reduce 的输出则存储在一个 global file system 中, 不必重新执行。

Master Failure

应对 master 故障的一个简单有效的方法是将 Master Data Structures 定期写入到磁盘上,即在磁盘上写入一个 checkpoint。如果 master 任务失效了,就可以从最后一个 checkpoint 上启动另一个 master。

作者的实现如果 master 失效,就会中止所有 MapReduce 计算,用户可以检查到这个失效状态,并根据需要来决定是否需要重新执行 MapReduce。

Semantics in the Presence of Failures

主要是解决多次 Map 或 Reduce 任务提交的幂等性问题,通过以下方式来保证原子性:

  • Map:如果 master 再次接受到同个 Map 任务的完成消息,则会忽略。
  • Reduce:Redcue 任务则通过原子方式将临时文件重命名为最终输出文件。

Locality

尽量将输入数据存储在本地磁盘上,避免通过网络传输,且 Map 任务通常在包含了输入数据的机器或就近的机器上执行。这样可以保证大部分的输入数据都能从本地机器读取,节省网络传输消耗。

Task Granularity

理想中情况下,M 和 R 应该比集群中 worker 数量要多得多,即保证任务粒度越小越好。

但是 master 中因为要执行 O(M+R) 次调度,从而内存中需要保存 O(M*R) 个状态,因此对 M 和 R 的实际取值是有一定限制的。

R 通常由用户指定,M 则需要取一个合适值,使得每一个 Map 任务都是处理大约 16MB ~ 64MB 的输入数据。

作者推荐的示例是,2000 台 worker 时,M = 200000,R = 5000。

Backup Tasks

有时候 MapReduce 中会出现这么一种情况:某台机器需要花费很长的时间才能完成最后的几个 Map 和 Reduce任务,从而拖累了整个 MapReduce 的执行时间,这种机器被称为 straggler 。通常 straggler 的出现原因是由于机器磁盘出现问题,或集群调度系统在这台机器上又分配了其他任务等等。

作者使用了一个通用机制来解决 straggler 问题,具体实现是通过在 MapReduce 快完成时,master 会把处于 in-progress,即仍在处理中的任务进行备份,通过 backup 进程执行。无论是原始进程还是 backup 进程执行完成,都视做整体的 MapReduce 完成。

这种备份处理机制只会占用比正常情况多几个百分点的计算资源,在处理某些任务,比如后文的排序任务时,关闭了备份处理机制的情况要比正常情况多花费 44% 时间完成。

Refinements

Partitioning Function

一个默认的分区函数是 hash(key) mod R ,然而在某些情况下,其他一些分区函数会有奇效。

比如,输出的 key 为URLs,我们希望所有同个主机的 entries 都存放在同个 output file 中,此时可以使用 hash(Hostname(urlkey)) mod R 作为分区函数,这样就可以保证所有来自同个主机的 URLs 都保存在至同个 output file 中。

Ordering Guarantees

作者确保在一个给定的 partition 内,intermediate key/value pairs 是按照 key 的递增顺序处理的。

Combiner Function

Input and Output Types

MapReduce Library 支持多种不同的输入数据形式,例如,”text” mode 的输入的每一行为一个 key/value pair,其中 key 为文件的 offset,value 为行的内容。

Side-effects

Skipping Bad Records

Local Execution

Status Information

Counters

MapReduce Library 提供了一种 Counter Facility 来统计不同事件发生的次数,比如用户可能想统计已经处理了多少个单词、已经索引了多少篇文档等。

为了使用 Counter Facility ,用户需要在程序中创建一个具名的 counter 对象,并在 Map 和 Reduce 函数中进行自增:

1
2
3
4
5
6
7
8
Counter* uppercase;
uppercase = GetCounter("uppercase");

map(String name, String contents):
for each word w in contents:
if (IsCapitalized(w)):
uppercase->Increment();
EmitIntermediate(w, "1");

这些 counter 的值会周期性地从各个 worker 中传给 master (在 ping 包中),master 对这些执行成功的 Map 和 Reduce 任务的 counter 值进行累加,当一个 MapReduce 完成后,再返回给用户。

Counter 的当前值也会显示在 master 的状态页上,这样用户就能看到当前的计算进度。当对 counter 值进行累加时,master 需要检查重复执行的 Map 和 Reduce 任务,避免重复累加。

Counter Facility 对于 MapReduce 的完整性检查十分有用,例如有时候用户需要确保 output key/value pairs 的数量精确等于 input key/value paris。

Performance

作者通过两个计算任务来衡量 MapReduce 的性能,一个是在大约 1TB 的数据中进行特定的模式匹配,另一个是对大约 1TB 的数据进行排序。

Cluster Configuration

1800 台机器组成的集群,每台机器配置 2 个 2Ghz 的 Intel Xeon CPU,4GB 内存,两个 160GB 硬盘和一个千兆网卡。

GREP

grep 任务需要扫描大概 10^10 个 100-byte 大小的记录,从中查找出现概率较小的 3 个字符(出现 92337 次)。输入数据以大约 64MB 的大小进行分割(M = 15000),输出数据保存至一个文件中(R = 1)。

下图显示了 grep 的计算过程,Y 轴为输入数据的处理速度,处理速度随着参与计算的机器数量而增加,当有 1764 台 worker 参与计算时,计算速度达到了峰值 30GB/s。当 Map 任务结束时,即在约 80 秒后,处理速度降为 0。

整个计算过程大约耗费了 150 秒,包括了大约 60 秒的启动阶段。

grep

Sort

sort 任务也需要处理 10^10 个 100-byte 大小的记录,Map 函数先从文本行中解析出 100-byte 大小的 key 作为排序的依据,再把这个 key 与对应的文本行作为 intermediate key/value pair 输出。Reduce 函数则不对 intermediate key/value pair 进行任何改变,直接输出。最终的排序结果会输出到一个 2-way replicated GFS files 中,即输出的数据为 2TB。

输入数据以大约 64MB 的大小进行分割(M = 15000),排序后的输出数据则保存至 4000 个文件中(R = 4000)。

下图 (a) 显示了正常的执行过程, (b) 显示了没有备份处理机制的执行过程,(c) 显示了模拟部分 worker 故障的执行过程。

可以看出,(b) 的耗时比 (a) 增加了许多(约 44%), (c) 的图中显示了一些为负的处理速度,这是由于执行 Map 任务的 worker 被 kill 了,Map 任务丢失而导致的,(c) 的耗时比 (a) 大约多了 5%。

sort

Experience

MapReduce Library 从 2003 年 1 月完成了第一个版本,之后在 Google 内部各个领域得到了广泛应用,包括:

  • 大规模的机器学习问题。
  • Google News 和 Froogle 的集群问题
  • 从用户查询产品的报告中抽取数据,例如 Google Zeitgeist
  • 从大量应用和新产品的网页中提取信息
  • 大规模的图形计算

上图: MapReduce 实例的增长情况。

下图:2004 年 8 月的 MapReduce Library 使用资源情况。

jobs

Large-Scale Indexing

目前为止,MapReduce 在 Google 内部最有意义的应用就是重写了 Google web search service 使用的 Index system。

Index system 的输入是爬虫爬取的海量文档,这些文档保存在 GFS 中,超过 20TB,Index system 通过一系列的 MapReduce 来建立索引(大约 5 到 10 次)。

Conclusions

MapReduce programming model 在 Google 内部得到了广泛的成功应用,作者将这些成功归为以下几个方面:

  • 高度封装,没有分布式经验的程序员也能十分容易地使用。
  • 可处理大量不同类型问题,例如可以生成用于 Google web search service 使用的数据、用于排序、用于数据挖掘、用于机器学习的数据等等。
  • 在数千台机器组成的大型集群上部署了 MapReduce 实现,这样能更有效地利用这些计算资源,且能在处理其他需要大量计算的问题上用到。

作者在 MapReduce 的开发过程中学到了以下几点:

  • Restricting 编程模型使得并行和分布式计算变得容易,也易于构建具有容错性的计算环境。
  • 网络带宽是稀有资源,MapReduce 中的许多优化都是为了减少网络传输,例如本地化读取策略,中间文件写入本地磁盘、只写入一份中间文件等。
  • 多次执行相同任务可以减少性能缓慢的机器带来的影响,同时还能解决由于机器故障导致的数据丢失问题。
  • 本文作者: Marticles
  • 版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 3.0 许可协议。转载请注明出处!