Hygao's Blog

论文下载

前言

最近拜读了 Google 三驾马车中的 MapReduce,这个被发布在 2004 年的论文中介绍了一个工作在分布式文件系统 GFS 之上的“批处理”框架。尽管由于各种原因, Google 在很久前就声称内部不再使用 MapReduce 了,但因为这个框架非常经典,以及当前依然有很多系统将它作为执行引擎(比如一些框架在 MapReduce 上添加 DSL 来做声明式系统),我认为研究这个框架仍然具有很大的意义。

概述

MapReduce 非常有趣的一点在于,它试图为使用者屏蔽掉分布式带来的大多数问题(没有完全屏蔽,因为 Partition 函数或 Combiner 函数都要求用户对任务在节点上的分布有一个明确的感知),诸如拆分输入、并行执行、错误处理、节点通信等底层的问题都由框架来处理。在使用上,它要求用户至少提供一个 Map 任务和一个 Reduce 任务,通常而言,Map 用于处理并行任务,而 Reduce 则将 Map 的输出进行聚合并产生最终的输出。

框架会将这些任务分配给集群中的多个节点来做分布式的执行。这里借用论文中提供的执行说明图,来简单阐述下框架的执行过程(结合论文 Appendix A 中的代码来看这幅图会更好一些,但由于篇幅原因,这里就不把相应的内容贴过来了):

MapReduce 执行过程

首先,对于一个任务而言,集群中存在 master 和 worker 两类节点(为了方便描述,这里将“分配到 master 程序的节点”称为 master 节点,worker 同理),顾名思义,master 负责协调任务的执行,worker 则是干活的。从论文提供的代码中可以看到,worker 的数量可以通过 spec.set_machines(<数量>) 来设置,不过注释中提到这里是设置“最多使用的机器数量”。与之相对的,master 则只有一个。

用户要做的任务是定义 Map 和 Reduce,然后声明输入文件和输出文件的名字,根据论文提供的代码,用户可以通过 out->set_filebase(<前缀>) 声明输出文件的前缀,比如 out->set_filebase("/gfs/test/freq"),而输出文件剩余的部分取决于 reduce 的数量,因为一个 reduce 对应一个输出文件。如果有 100 个 reduce,那么最终就会生成 /gfs/test/freq-00000-of-00100、/gfs/test/freq-00001-of-00100 这样的文件。不过不清楚为什么要从 0 开始,如果这样那么最后一个文件的名字可能就是 /gfs/test/freq-00099-of-00100,看起来怪怪的。

当用户定义了 Map 和 Reduce 任务后,master 会给 worker 分配任务,由于通常而言 map 和 reduce 的数量和都是大于 worker 的数量的,所以一个 worker 可能会被分配到多个任务。那么 map 和 reduce 的数量是如何确定的呢?对于 map 而言,数量是自动确定的。框架会把输入文件拆分成 16~64 MB 的一个个输入块(split),每个块的具体大小由用户控制。这样一来,只要知道输入文件的大小,就可以知道需要被分割成多少个输入块,而由于一个 map 任务处理一个输入块,所以 map 任务的数量也就知道了;与之相对的,reduce 任务的数量则由用户通过 out->set_num_tasks(<数量>) 自己定义。

Map 任务读取输入,根据配置好的规则将输入解析成一个一个的 Key-Value pair,然后用这个 k-v pair 作为入参调用用户提供的代码来产生输出,输出同样以 k-v pair 的形式表示,首先会被写入到内存中,然后由框架周期性地将这些结果写入到本地磁盘(注意和 GFS 的全局存储区分开)中。在写入的过程中会调用用户提供的分区函数对输出进行分区,比如 hash(Key) % <Reduce 任务的数量>,因此一个 map 任务可能会产生很多分区。map 任务产生的这些文件对应的位置会上报给 master 节点,然后 master 节点会将各个分区文件的位置发给对应的 reduce 任务。

当 reduce 任务收到 master 节点的通知时,它会通过 RPC 来从各个 map 任务那里拉取属于自己分区的文件。而当 reduce 任务拉取到所有的文件时,它会对这些文件中的 k-v pair 做排序,排序的结果使得所有具有相同 key 的 value 被聚合在一起,而 key 本身则也根据其语义被排序。对于排序本身而言,根据内存中是否能容纳所有的数据,reduce 任务会按需使用硬盘做辅助空间进行外部排序。

经过这样的处理后,框架会遍历排好序的结果,依次为其调用用户提供的 reduce 代码。代码接受一个 key 和一个迭代器作为参数,迭代器中的内容是属于这个 key 的所有 value,之所以要使用迭代器而不是一个简单的列表,是为了避免一个 key 中的所有 value 的数量过大,导致内存不能将其容纳进来。迭代器屏蔽了数据的来源,不管数据是来自于内存还是硬盘,在代码层面的处理都是相同的。

当分区中所有的内容都被用户提供的 reduce 任务处理后,就会在前文所述的输出文件中产生处理后的结果。而到此为止,整个 MapReduce 任务就结束了。

细节

前面描述了一次 MapReduce 任务的大概流程,在这个基础上,依然有一些细节问题值得探讨,下面按发生的顺序来一一阐述。

首先,Google 的 MapReduce 是运行在 GFS 集群上的,作为一个分布式的文件系统,GFS 将文件按 64 MB拆分成几个小块,并存储在不同的机器上,而这些机器很可能就被作为了 MapReduce 的 worker。这就给了框架进行优化的空间,具体来说,master 可以通过 GFS 获知到输入文件的布局,比如节点 A 上有文件块 B,而节点 A 又是一个 worker,那么 master 就可以把处理 B 的 map 分配到节点 A 上,这样 map 在读取输入块时就不需要通过网络来拉取数据,直接从本地就可以获取到,从而节省了不必要的网络传输。

其次,map 在读取输入时,实际上读取到的是经过拆分的输入块,该输入块的大小由用户来定义。但是,考虑这样一种情况,假设用户将输入块的大小定义为 16MB,但是这个输入块中最后一条记录是不完整的,对于这整条记录而言,它的前半部分在这个输入块中,而后半部分在下一个输入块中。在这样的情况下,如果依然严格按照 16MB 进行拆分,那么这两个输入块对应的 map 任务都会因为这条不完整的记录而出现问题。对于这个问题,论文 4.4 节表示输入是被一种叫做 reader 的组件来拆分的,而 reader 是知道如何将文件拆分成有意义的记录的(原文:Each input type implementation knows how to split itself into meaningful ranges for processing as separate map tasks),所以我猜测在 reader 这一层面会根据具体的需求灵活地分隔文件,也即前文所述的 16~64 MB 的限制并不是一个硬性限制。

然后,用户可以提供 Partition 函数来帮助 map 决定输出的内容需要被存储的位置。除此之外,用户还可以提供 Combiner 函数。关于这个组件的作用,论文 4.3 节有详细的描述,可以简单地将其理解为一种预处理。论文提供的代码中,被用作 reduce 任务的内容同样被作为了 combiner,这意味着 combiner 函数在执行前,map 产生的内容应该是被排好序的。如论文 4.2 节所言,给定一个分区,内部的 key 是有序的。那么结合 combiner 来看,这个有序就不仅指 reduce 收到的完整的 partition 内容是有序的,对于 map 产生的部分 partition,内部的内容也同样是有序的才对。

接着,MapReduce 提供一种备份任务机制。具体而言,当 MapReduce 过程近乎完成时,框架会把那些尚未完成的任务分配给空闲的 worker,这样同样的任务就有可能被多个 worker 执行,而任意一个 worker 执行成功,master 都会认为这个任务执行成功,从而避免某个 worker 因为各种外部原因(比如硬盘老化导致写入时要不停地进行纠错从而降低写入速度)拖慢整个 MapReduce 的执行速度。

最后,一些特殊的输入可能导致 map/reduce 任务异常退出,这可能是因为用户的代码有 bug,也可能是因为记录本身有问题。要避免因为这种问题导致 MapReduce 无法正常完成,最好的解决办法当然是找出 bug 或记录的问题,但除此之外,框架也允许用户跳过这个会导致问题的记录。具体而言,框架提供了一种特殊的运行模式,在这种模式下,如果 map/reduce 任务异常退出,那么 master 可以感知到是因为哪条记录,然后重新把任务分配给 worker 让其重新执行,如果这次执行同样发生了异常退出,那么 master 在下一次分配这个任务时会告知 worker 跳过这条有问题的记录,从而避免流程进入 执行-异常退出-执行 的死循环中。

错误处理

尽管在使用上基本类似于单机框架,但 MapReduce 本质上是作用在分布式环境下的,对于一个分布式系统而言,错误处理永远是一个特别重要的话题。下面来探讨下 MapReduce 框架是如何进行错误处理的。

在深入机制之前,我们首先要明确 MapReduce 是一个批处理框架,这意味着实时性并不是特别重要,如果用户提交了一个任务,需要几分钟乃至几个小时才能执行完成,这都是一个非常正常的情况。也正因为如此,MapReduce 的错误处理非常简单,最核心的就是重试

因为需要重试,所以 map/reduce 任务的类型是有限的,根据论文的描述,它应该是确定性(deterministic)的,也就是说,同一个任务不论执行多少次,产生的输出都应该是一样的。除此之外,我认为任务本身还应该是幂等的,这意味着类似“每处理一条记录就写一条日志来标识”的操作是有问题的,因为重试很可能导致某条记录对应的日志不唯一。

在机制上,master 节点会定期地 ping 一下所有的 worker 节点,如果某个节点没有回复响应,那么 master 就将这个 worker 标记为异常,然后将它上面运行的所有任务在正常的 worker 上重新执行。那么,重试对整个执行流程有什么影响吗?答案是没什么影响,我们分别针对 map 和 reduce 来讨论。

首先对于 map 而言,master 有这样一条限制:如果已经从一个 map 获取到了其上报的中间文件信息,且这个 map 所在的 worker 是正常的,那么会忽略不同 worker 上的同一个 map 上报的信息。这条限制也是前文所述的备份任务得以正常执行的前提。因此,如果一个 map 所在的 worker 异常了,那么 master 会将同样的 map 分配给其他的 worker,即便后面这个 worker 恢复了,master 也会从这两份 map 上报的信息中选择更早的那一份作为最终的文件路径。如前所述,这些输出文件是被保存在 map 所在 worker 自己的本地磁盘上的,所以只要两个 map 所在的 worker 不同,产生的文件就互不影响。而确定了最终的文件路径后,master 会将其通知给所有的 reduce,这时如果哪个 reduce 没有获取或尚未获取完这个 map 对应的输出文件,那么将继续从这个新的 worker 上拉取对应的输出文件。

然后是 reduce,为了避免因为重试导致的多个 reduce 实例一起写入同一个最终输出文件(例如 /gfs/test/freq-00001-of-00100),每个 reduce 实际上是先写一个相互隔离的临时文件的。也就是说,即便是同一个 reduce 任务的不同实例,写入的临时文件也是不同的。在整个 reduce 任务完成后,框架将这个临时文件更名为最终输出文件的名字,而对于 GFS 而言,更名操作是原子性的,这就保证了最终输出文件的完整性。

但是仔细考虑 reduce,会发现很可能最终 MapReduce 执行完成后,GFS 上因为重试而保存了多份同样的内容,其中之一作为最终的输出文件,剩下的都是临时文件,这显得有些冗余。但是,其实 GFS 本身保证写入操作的一致性就是“至少一次”,也就是说,使用 GFS 写入文件时(实际上是追加写),本身就可能产生多份重复的内容,只不过在读取时不会感知到。因此,MapReduce 导致的这份“冗余”,在这样的环境下就显得合情合理了。