MapReduce 知识

 我来答
舒适还明净的海鸥i
2022-07-15 · TA获得超过1.7万个赞
知道小有建树答主
回答量:380
采纳率:0%
帮助的人:70.1万
展开全部

客户端(client)
提交MapReduce作业
JobTracker
1.作业调度:将一个作业(Job)分成若干个子任务分发到taskTraker中去执行
2.任务监控:TaskTracker发送心跳给JobTracker报告自己的运行状态,以让JobTracker能够监控到他
3.资源管理:每个任务向JobTracker申请资源
4.监控过程中发现失败或者运行过慢的任务,对他进行重新启动
TaskTracker
主动发送心跳给jobTracker并与JobTracker通信,从而接受到JobTracker发送过来需要执行的任务

资源表示模型
用于描述资源表示形式,Hadoop1.0使用“槽位(slot)”组织各个节点的资源,为了简化资源的管理,Hadoop将各个节点上资源(CPU、内存、网络IO、磁盘IO等等)等量切分成若干份,每一份用“slot”表示,同时规定一个task可根据实际情况需要占用多个”slot”。
简单的说:hadoop1.0将多维度的资源进行了抽象,使用“slot”来表示,从而简化对资源的管理。

资源分配模型
而资源分配模型则决定如何将资源分配给各个作业/任务,在Hadoop中,这一部分由一个插拔式的调度器完成。

更进一步说,slot相当于运行的“许可证”,一个任务只有获得“许可证”后,才能够获得运行的机会,这也意味着,每一个节点上的slot的数量决定了当前节点能够并发执行多少个任务。Hadoop1.0为了区分MapTask跟ReduceTask所使用资源的差异,进一步将slot分为MapSlot跟ReduceSlot,他们分别只能被MapTask跟ReduceTask使用。

Hadoop集群管理员可根据各个节点硬件配置和应用特点为它们分配不同的map slot数(由参数mapred.tasktracker.map.tasks.maximum指定)和reduce slot数(由参数mapred.tasktrackerreduce.tasks.maximum指定)

静态资源配置 。 采用了静态资源设置策略,即每个节点事先配置好可用的slot总数,这些slot数目一旦启动后无法再动态修改。
资源无法共享 。 Hadoop 1.0将slot分为Map slot和Reduce slot两种,且不允许共享。对于一个作业,刚开始运行时,Map slot资源紧缺而Reduce slot空闲,当Map Task全部运行完成后,Reduce slot紧缺而Map slot空闲。很明显,这种区分slot类别的资源管理方案在一定程度上降低了slot的利用率。
资源划分粒度过大 。资源划分粒度过大,往往会造成节点资源利用率过高或者过低 ,比如,管理员事先规划好一个slot代表2GB内存和1个CPU,如果一个应用程序的任务只需要1GB内存,则会产生“资源碎片”,从而降低集群资源的利用率,同样,如果一个应用程序的任务需要3GB内存,则会隐式地抢占其他任务的资源,从而产生资源抢占现象,可能导致集群利用率过高。
没引入有效的资源隔离机制 。Hadoop 1.0仅采用了基于jvm的资源隔离机制,这种方式仍过于粗糙,很多资源,比如CPU,无法进行隔离,这会造成同一个节点上的任务之间干扰严重。

主要是InputFormat。InputFormat类有2个重要的作用:
1)将输入的数据切分为多个逻辑上的InputSplit,其中每一个InputSplit作为一个map的输入。
2)提供一个RecordReader,用于将InputSplit的内容转换为可以作为map输入的k,v键值对。

系统默认的RecordReader是 LineRecordReader ,它是 TextInputFormat (FileInputFormat的子类)对应的RecordReader; Map读入的Key值是偏移量,Value是行内容。

两个Mapper各自输入一块数据,由键值对构成,对它进行加工(加上了个字符n),然后按加工后的数据的键进行分组,相同的键到相同的机器。这样的话,第一台机器分到了键nk1和nk3,第二台机器分到了键nk2。

接下来再在这些Reducers上执行聚合操作(这里执行的是是count),输出就是nk1出现了2次,nk3出现了1次,nk2出现了3次。从全局上来看,MapReduce就是一个分布式的GroupBy的过程。

从上图可以看到,Global Shuffle左边,两台机器执行的是Map。Global Shuffle右边,两台机器执行的是Reduce。

Hadoop会将输入数据划分成等长的数据块,成为数据分片。Hadoop会为每个分片构建一个map任务。并行的处理分片时间肯定会少于处理整个大数据块的时间,但由于各个节点性能及作业运行情况的不同,每个分片的处理时间可能不一样,因此, 把数据分片切分的更细可以得到更好的负载均衡

但另一方面,分片太小的话,管理分片和构建map任务的时间将会增多。因此,需要在hadoop分片大小和处理分片时间之间做一个权衡。对大多数作业来说,一个分片大小为64MB比较合适,其实,Hadoop的默认块大小也是64MB。

我们上面看到了hadoop的数据块大小与最佳分片大小相同,这样的话,数据分片就不容易跨数据块存储,因此,一个map任务的输入分片便可以直接读取本地数据块,这就 避免了再从其它节点读取分片数据 ,从而节省了网络开销。

map的任务输出是 写入到本地磁盘而非HDFS的 。那么为什么呢?因为map任务输出的是中间结果,一旦map任务完成即会被删除,如果把它存入HDFS中并实现备份容错,未免有点大题小做。如果一个map任务失败,hadoop会再另一个节点重启map一个map任务。

而reduce任务并不具备数据本地化优势——单个reduce任务的输入通常来自所有mapper输出。一般排序过的map输出需要通过 网络传输 发送到运行reduce任务的节点,并在reduce端进行合并。reduce的输出通常需要存储到HDFS中以实现可靠存储。每个reduce输出HDFS块第一个复本会存储在本地节点,而其它复本则存储到其它节点,因此reduce输出也需要占用网络带宽。

1.调整reduce个数方法(1)
(1)每个Reduce处理的数据量默认是256MB

(2)每个任务最大的reduce数,默认为1009

(3)计算reducer数的公式

2.调整reduce个数方法(2)
在hadoop的mapred-default.xml文件中修改,设置每个job的Reduce个数

3.reduce个数并不是越多越好
(1)过多的启动和初始化reduce也会消耗时间和资源;
(2)另外,有多少个reduce,就会有多少个输出文件,如果产生了很多个小文件,那么如果这些小文件作为下一个任务的输入,则也会出现小文件过多的问题;
在设置reduce个数的时候也需要考虑这两个原则:处理大数据利用适合的reduce数;使单个reduce任务处理数据大小要合适;

在进行map计算之前,mapreduce会根据输入文件计算输入分片(input split),每个输入分片(input split)针对一个map任务,输入分片(input split)存储的并非数据本身,而是一个分片长度和一个记录数据的位置的数组,输入分片(input split)往往和hdfs的block(块)关系很密切,我们没有设置分片的范围的时候,分片大小是由block块大小决定的,和它的大小一样。

比如把一个258MB的文件上传到HDFS上,假设block块大小是128MB,那么它就会被分成三个block块,与之对应产生三个split,所以最终会产生三个map task。我又发现了另一个问题,第三个block块里存的文件大小只有2MB,而它的block块大小是128MB,那它实际占用Linux file system的多大空间?答案是实际的文件大小,而非一个块的大小。最后一个问题是: 如果hdfs占用Linux file system的磁盘空间按实际文件大小算,那么这个”块大小“有必要存在吗?其实块大小还是必要的,一个显而易见的作用就是当文件通过append操作不断增长的过程中,可以通过来block size决定何时split文件。

1.每个输入分片会让一个map任务来处理,map输出的结果会暂且放在一个环形内存缓冲区中(该缓冲区的大小默认为100M,由io.sort.mb属性控制),当该缓冲区快要溢出时(默认为缓冲区大小的80%,由io.sort.spill.percent属性控制),会在本地文件系统中创建一个溢出文件,将该缓冲区中的数据写入这个文件。

2.在写入磁盘之前,线程首先根据reduce任务的数目将数据划分为相同数目的分区,也就是一个reduce任务对应一个分区的数据。这样做是为了避免有些reduce任务分配到大量数据,而有些reduce任务却分到很少数据,甚至没有分到数据的尴尬局面。其实分区就是对数据进行hash的过程。然后对每个分区中的数据进行 排序 ,如果此时设置了Combiner,将排序后的结果进行Combiner操作,主要是在map计算出中间文件前做一个简单的合并重复key值的操作,这样做的目的是让尽可能少的数据写入到磁盘。

3.当map任务输出最后一个记录时,可能会有很多的溢出文件,这时需要将这些文件合并。合并的过程中会不断地进行排序和Combiner操作,目的有两个:1.尽量减少每次写入磁盘的数据量;2.尽量减少下一复制阶段网络传输的数据量。最后合并成了一个 已分区且已排序 的文件。为了减少网络传输的数据量,这里可以将数据压缩,只要将mapred.compress.map.out设置为true就可以了。

4.将分区中的数据 拷贝 (网络传输)给相对应的reduce任务。有人可能会问:分区中的数据怎么知道它对应的reduce是哪个呢?其实map任务一直和其父TaskTracker保持联系,而TaskTracker又一直和JobTracker保持心跳。所以JobTracker中保存了整个集群中的宏观信息。只要reduce任务向JobTracker获取对应的map输出位置就ok了哦。

Reduce端:
1.Reduce会接收到不同map任务传来的数据,并且每个map传来的数据都是有序的。如果reduce端接受的数据量相当小,则直接存储在内存中(缓冲区大小由mapred.job.shuffle.input.buffer.percent属性控制,表示用作此用途的堆空间的百分比),如果数据量超过了该缓冲区大小的一定比例(由mapred.job.shuffle.merge.percent决定),则对 数据合并 溢写 到磁盘中。

2.随着溢写文件的增多,后台线程会将它们合并成一个更大的有序的文件,这样做是为了给后面的合并节省时间。其实不管在map端还是reduce端,MapReduce都是反复地执行排序,合并操作,现在终于明白了有些人为什么会说:排序是hadoop的灵魂。

3.合并的过程中会产生许多的中间文件(写入磁盘了),但MapReduce会让写入磁盘的数据尽可能地少,并且 最后一次合并的结果 并没有写入磁盘,而是直接输入到reduce函数。

Read阶段 :MapTask通过用户编写的RecordReader,从输入InputSplit中解析出一个个key/value

Map阶段 :该节点主要是将解析出的key/value交给用户编写map()函数处理,并产生一系列新的key/value。

Collect收集阶段 :在用户编写map()函数中,当数据处理完成后,一般会调用OutputCollection.collect()输出结果。在该函数内部,它会将生成的 key/value分区 (调用Partitioner),并写入一个环形内存缓冲区中。

Spill阶段 :即“溢写”,当环形缓冲区满后,MapReduce会将数据写入本地磁盘上,生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次本地 排序 ,并在必要时对数据进行 combiner 压缩 等操作。

溢写阶段详情:

合并阶段 :当所有数据处理完成后,MapTask对所有临时文件进行一次合并,以确保最终只会生成一个数据文件。在进行文件合并过程中,MapTask以分区为单位进行合并。对于某个分区,它将采用多轮递归合并的方式。每轮合并io.sort.factor(默认100)个文件,并将产生的文件重新加入待合并列表中,对文件排序后,重复以上过程,直到最终得到一个大文件。让一个MapTask最终只生成一个数据文件,可避免同时打开大量文件和同时读取大量小文件产生的随机读取带来的开销。

已赞过 已踩过<
你对这个回答的评价是?
评论 收起
推荐律师服务: 若未解决您的问题,请您详细描述您的问题,通过百度律临进行免费专业咨询

为你推荐:

下载百度知道APP,抢鲜体验
使用百度知道APP,立即抢鲜体验。你的手机镜头里或许有别人想知道的答案。
扫描二维码下载
×

类别

我们会通过消息、邮箱等方式尽快将举报结果通知您。

说明

0/200

提交
取消

辅 助

模 式