Hadoop--MapReduce源码分析

  本片博客Hadoop的MapReduce源码分析,主要对上篇博客Hadoop-MapReduce详解/做进一步的理解。MapReduce源码分析东西太多,截图都截累了,前面比较详细一点靠后面可能只把主要的贴了上来,空闲了我在做一些补充吧!

  • MapReduce过程概述
  1. Job提交任务,会做初始化配置文件、检查Output Path、请求RM获取JobId、复制资源文件到HDFS等操作然后提交job作业;
  2. RM接收到作业后会初始化Job对象,然后启动一个NM会从HDFS获取资源文件分配一个Container,在Container中启动一个AM,在过程中会把文件分割为多个输入分片;
  3. 每个输入分片会让一个map任务来处理输出多个键值对,输出的结果会暂且放在一个环形内存缓冲区中;
  4. 紧接着就是分区,目的是把k2分配到不同的reduce task;
  5. map输出时可能会有很多的溢出文件,要将这些文件合并。合并的过程中会不断地进行排序和combia操作,最后合并成了一个已分区且已排序的文件。
  6. reduce阶段,接收到不同map任务传来的数据,如果接收数据大超过阈值就会溢出写入磁盘,溢出文件增多合并成大的有序的文件(反复地执行排序,合并操作)
  7. MapReduce会让写入磁盘的数据尽可能地少,并且最后一次合并的结果并没有写入磁盘,而是直接输入到reduce函数。 .MapReduce的Shuffle和排序,图片来源:http://weixiaolu.iteye.com/blog/1474172

Job提交作业

对Job提交作业画了一个简单的图

  1. Job开始提交任务
    一个简单的MapReduce开始提交任务

  2. 接下来我们进入Job类看一看waitForCompletion方法,waitForCompletion方法中主要调用了两个方法:submit()monitorAndPrintJob(),如下图所示:
    Job类的waitForCompletion()方法

submit方法

  • 在waitForCompletion中,我们先看一看submit方法,该方法主要提交Job,调用JobSubmitter.submitJobInternal()方法完成
    waitForCompletion的submit()方法

submitJobInternal

  • submitJobInternal(),主要有以下几个主要功能:
    1、会检查job的Output Path;
    2、设置Job ID
    3、设置Job 命令选项
    4、分割、切片并创建splits文件
    5、复制配置文件到HDFS
    6、正在的提交Job

  • 具体代码如下:



    submitJobInternal()方法

  1. checkSpecs() 方法检查output文件;
  2. setJobID() 设置Job ID;
  3. writeSplits() 写Splits文件;
  4. cleanUpTokenReferral() 清除jobtoken referrals;
  5. writeConf() 写入job相关的配置文件到HDFS;
  6. submitClient.submitJob() 做真正的提交Job。

writeSplits

  • 在看submitClient.submitJob()之前我们先看一看分割、切片 writeSplits() 方法
    writeSplits() 方法

  • writeSplits下面调用了writeNewSplits()writeOldSplits()方法,我们只看一个writeNewSplits方法:

  1. 通过InputFormat的getSplits()方法获取一个List
  2. 排序后再创建SplitFiles
    writeNewSplits()方法

InputFormat

InputFormat抽象类仅有两个抽象方法:

  1. List getSplits(), 获取由输入文件计算出输入分片(InputSplit),解决数据或文件分割成片问题。
  2. RecordReader<K,V> createRecordReader(),创建RecordReader,从InputSplit中读取数据,解决读取分片中数据问题
  3. 具体完成以下功能
    3.1. 验证作业输入的正确性
    3.2. 将输入文件切割成逻辑分片(InputSplit),一个InputSplit将会被分配给一个独立的MapTask
    3.3. 提供RecordReader实现,读取InputSplit中的“K-V对”供Mapper使用
  • 一、getSplits()
  • 一.1、getSplits()InputFormat接口的抽象方法,所以我们在具体InputFormat的实现类下面的实现方法就行了,如下图:
    具体**InputFormat**的实现类

  • 一.2、下面我们来看看FileInputFormat在这里,记得弄清楚Splits和Block的关系

    FileInputFormat的getSplits()方法

  1. 一个InputSplit对应一个Map Task
  2. 默认文件可分割(true),这样一个block(128MB)就对应一个InputSplit(通过computeSplitSize(…)方法)
  3. 通过分析while循环,得知一个InputSplit对应一个block有利于map计算的数据本地化
  4. 果文件不允许分隔整个文件作为一个InputSplit,这样一个InputSplit就可能对应多个block
    注意:getSplits完成后返回一个List InputSplit,然后对它进行排序sort创建Split文件createSplitFiles具体的排序算法这些博主就不一一贴图了,比较好理解,大家可以自己去看看。

提交JOB到RM

下面看一看ClientProtocol 提交Job submitJob的实现类YARNRunner,主要是提交JobResourceManager
submitJob的实现类YARNRunner

monitorAndPrintJob

  • 上面submit方法已经提交job了,该方法就是做一些监控,并打印日志信息

    monitorAndPrintJob()方法

读取InputSplit中的数据

  • 读取InputSplit中的数据,转成k1,v1 ,由LineRecordReader完成,存在于TextInputFormat
    TextInputFormat类
  1. TextInputFormat方法 相当于一个解释器,有很多的解释器,TextInputFormat为默认的文本解释器;
  2. TextInputFormat方法 重写了isSplitable方法和 RecordReader 方法()相当于定义了自己的规则;
  3. getRecordReader方法 最后返回了一个LineRecordReader的实例
    :我们的job程序默认使用的就是TextInputFormat,该类的泛型已经明确指定了,所以在job配置的时候,不需要指定k1,v1的类型。
  • 下面我们来一起看一看LineRecordReader类
    LineRecordReader类
  1. 先看看父类RecordReader 因为比较清晰明了,这几个方法很重要,如下图所示:
    LineRecordReader的父类RecordReader

  2. 返回LineRecordReader,看一看大致有哪些具体实现
    返回LineRecordReader

  3. 看一看几个重要的方法,如下图源码每次调用nextKeyValue()时,value表示当前行的内容,key表示已经读取后的位置具体实现:
    LineRecordReader的nextKeyValue具体的实现

  4. 在MR框架中,有很多InputFormat和FileInputFormat的实现类。比如SequenceFileInputFormatNLineInputFormatDBInputFormat等DBInputFormat表示从SQL表中读取数据,可以看出我们不仅仅是从HDFS中取数据作为输入还可以从数据库中读取数据,来一个简单的截图:
    InputFormat的具体实现DBInputFormat

执行map task

  1. 在我们写代码的时候,可以覆盖setupmapcleanup方法;
  2. 框架调用map的时候,是通过反射的方式产生的,然后调用实例化对象中的run()。
    Mapper类

  3. 注意我们Mapper类write()方法,其实是调用了org.apache.hadoop.mapreduce.Mapper.Context

  • 程序真正执行时,是启动了一个YarnChild类,下面我们来分析框架如何调用map task或者reduce task。
    YarnChild类的run()方法
  1. 看一下taskFinal的一个实现类是MapTask
    taskFinal的一个实现类是MapTask

  2. 继续分析下面的runNewMapper()方法
    下面跳过一部分代码截图......
    runNewMapper()方法
    在这里可以看出:

    如果没有reduce task,那么map直接把<k2,v2>输出。如果有,创建排序的输出。
    在这里,output实际上是WordCountMapper类中的map()方法里面的context.write()实际调用,就是在output中调用的。

  3. 接下来分析NewOutputCollector类的实现的构造方法partitioner类是在这里实例化的
    NewOutputCollector类的实现的构造方法

分区

  • 我们还是在NewOutputCollector方法里主要看看write(…)方法。
    NewOutputCollector的write(...)方法

  • 当在WordCountMapper类的map()方法中调用context.write(…)的时候,实际上是调用collector.collect(…) ,在这个方法的形参中,第三个参数就是分区。
    collector.collect(...)方法

  • 在这里collector的实现类是MapOutputBuffer类。
    常见异常,以及SpillThread

排序

  • 我们就到了SpillThread类,接下来分析SpillThread类,直接看run()方法,代码比较简洁:
    SpillThread类的run()方法
  1. 继续分析sortAndSpill()方法,可以看到,在spill之前,会先进行sort操作。
    sortAndSpill()方法

合并

  • 现在我们任然在SpillThread类里面,继续看下面的代码:
    SpillThread类里面的合并逻辑代码部分
    上面的代码说明:如果没有combiner,直接把<k2,v2>写入到磁盘。如果有combiner,先执行combiner再写入磁盘。

写入磁盘(spill)

  • 写入磁盘(spill),实际上是通过调用InMemoryWriter来实现的。
    写入磁盘(spill)

reduce的全过程

  • 上面已经把map的过程走完了,截图真累!!下面我们简单看看Reduce吧!Reducer和Mapper很类似,这里就不一一解释了:
    Reducer和Mapper很类似

  • 直奔主题ReduceTask的run()方法

  • 继续向下看代码。
    运行shuffer代码逻辑

  • 查看shuffleComsumerPlugin.run()的时候,跳转到Shuffle类中。

当前网速较慢或者你使用的浏览器不支持博客特定功能,请尝试刷新或换用Chrome、Firefox等现代浏览器