7.3 MapReduce工作流程
(1) 首先从HDFS中读取数据,并对它做分片操作(split)
(2) 每个小分片单独启动一个map任务来处理此分片的数据。map任务的输入和输出都是key-value
(3) 把每个map输出的key-value都进行分区,然后做排序、归并、合并后,分发给所有reduce节点去处理——这个过程称为shuffle。因此map输出的分区数量取决于reduce机器(节点)的数量。
(4) reduce处理后的结果再写到HDFS中
注意:map之间是不会进行通信的,reduce之间也不直接信息交互。用户不能直接控制节点之间的数据交换,都由MapReduce框架自身实现以降低开发难度。
对于上段的过程,分阶段(模块)做更详细地说明。为方便,假设集群只包含两个节点
首先由InputFormat模块把文件从HDFS中读取出来,并进行格式验证。然后InputFormat还要把数据切分成多个分片split——注意这种切分只是一种逻辑定义,物理上并不发生移动。
由记录阅读器RR(Record Reader)根据split的位置长度信息,把split从HDFS中读出来,输出是key-value格式(因为map函数只接收key-value格式输入)。
将key-value输入到map函数中(处理逻辑由用户自定义),输出中间结果。
将中间结果做shuffle处理,即分区、排序、合并、归并,获得key-list[value]形式的结果。然后,把shuffle的结果分发给 各节点 的reduce任务(注意这时候会跨节点交互数据)。
reduce函数对输入数据进行分析(处理逻辑也是用户自定义的),分析结果以key-value格式输出。
OutputFormat对输出格式进行检查,并检查其他设置,如输出目录是否存在。检查通过后,把结果再写入HDFS中去。
一个大文件会被分成很多数据块Block存储在HDFS的各个数据节点DataNode中(每个Block都有多个冗余副本存储在不同DataNode)。比如一个文件被分为如下图的六个Block。而分片也是对原来整个文件做处理的,也就是把六个Block合并起来重新分配(只是逻辑上的合并不是物理上的)。比如下图的Split1包含全部Block1和部分Block2。
——也可以认为,逻辑层面上Block和Split没有关系。
分片数量由用户自定义。多个分片意味着可以并行处理文件,体现分布式计算的优势。但是分片也不是越多越好,因为分片数量就是Map任务数量,而Map任务之间切换要消耗管理资源。所以过多的分片会影响执行效率。
习惯上会把split的大小就设置得和block一样,一般是64MB或128MB,以尽量避免切分Block而增加传输开销。
Map的数量即Split的数量(上面已经解释)。
Reduce的数量由用户自定义。最优选择遵循的原则是,略小于集群中所有Reduce Slot的总量(考虑预留一些资源处理可能发生的错误)。
Shuffle先后分为Map端Shuffle和Reduce端Shuffle
Map端Shuffle经历如下过程
输入数据是由RecordReader处理得到的key-value,然后给到Map任务,Map函数由用户自定义,输出是list(<key, value>)。
为了降低磁盘寻址开销、提高效率,Map处理的结果并不直接写入磁盘,而是先写入缓存。
直到缓存即将写满,则触发溢写进程。首先对缓存中的数据做分区、排序和合并:
分区 是为了后面传给Reduce任务做准备,所以有几个Reduce Task就分几个区。默认采用Hash函数,可以用户自定义。
排序 是依据字典的key来做的。排序是系统默认操作,用户不须干预。
合并 (combine)可以减少键值对数量。比如有两个键值对<"a",1 >和<"a",2>表示字符出现的次数,经过合并操作可得到<"a",3>。这样可以减少后面写磁盘的开销。合并操作不是必须的,只有用户定义了才会执行。
执行完上述操作后,缓存中的数据被写入磁盘。需要注意的是,为了保证接收Map输出不中断,并不是把缓存彻底写满才触发溢写,而是在大约80%的时候就开始写磁盘,同时剩下20%继续接收Map输出。
溢写过程多次发生,则磁盘上会形成多个溢写文件。当文件数量大于某个值(用户可自定义),系统会将它们归并(merge)成一个大的文件存放在磁盘。这个大文件保持了前面分区、排序和合并的处理结果。
注意合并combine和归并merge的不同:比如对于输入<"a", 1>和<"a", 2>,combine的结果是键值对<"a", 3>,而merge的结果是<key, value-list>如<"a", <1,2>>
JobTracker会跟踪归并文件的生成,一旦探测到一个归并大文件完成,就会通知各Reduce任务将属于自己分区的部分拉走。Map端Shuffle就完成了。
Reduce端Shuffle的过程如下
当接到JobTracker通知,每个Reduce任务会从多个Map任务获得数据,这些数据首先也会被保存在缓存中。
当缓存达到一定大小,也会触发溢写操作:先做归并然后合并,最后写入磁盘。
于Map Shuffle一样,当磁盘中溢写文件数量达到用户设定值,则触发文件归并,最后把归并后的大文件输出给Reduce任务处理。
值得一提的是,如果领取的任务很小,甚至达不到缓存上限,那么系统会在缓存中做归并合并处理后,跳过溢写步骤,直接把数据传给Reduce任务。
上面描述的过程是从数据流角度看。而从系统角度看,MapReduce运行用户编写的应用程序过程如下:
用户启动MapReduce后,程序会被部署到不同的机器上去。一个机器会作为Master运行JobTracker,其他机器作为Worker运行TaskTracker
将Map Task和Reduce Task分配给各个Worker
从HDFS中读取的数据被InputFormat分成许多Split,这些数据被提交给Map任务(处理逻辑由用户编写),输入格式是key-value。Map任务数量和Split数量一致。输出为key-value列表
Map输出先保存到缓存,达到一定规模则触发溢写,即进行分区、排序、合并然后写入磁盘
数据在磁盘中归并后,由各Reduce任务取走各自分区的部分,然后执行用户自定义的Reduce函数来处理数据,最后输出key-value格式的结果
将结果文件写入HDFS中。
Reference:
https://www.icourse163.org/learn/XMU-1002335004#/learn/content?type=detail&id=1214310152&sm=1