在之前的数篇笔记中我们从零开始完成了 Hadoop 分布式集群的搭建。从这一篇笔记开始,我们要通过 MapReduce 计算框架的学习和实践,将集群转化为生产力。
本篇是系列的第三篇笔记。本篇笔记中我们将会继续分析学习 Mapper 任务启动和数据读取的相关源代码,从而加深对相关流程的理解。
Map Task 源代码分析(一)
Mapper.run()
在第一篇笔记的 WordCount 程序中,我们的 MyMapper 类继承了 Mapper 类。打开 Mapper 类源代码,发现其核心在于run()方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| ... public void run(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { this.setup(context);
try { while(context.nextKeyValue()) { map(context.getCurrentKey(), context.getCurrentValue(), context); } } finally { cleanup(context); } }
...
|
run()方法内容很简单。方法中有一个参数context。我们不断地从中获取键值对进行操作。同时在我们实现的map()方法中,Map Task 中键值对的输出也依赖于context。这里就产生了两个问题:第一,Context 对象是何时实例化的;第二,run()方法是谁来调用的。
MapTask
在 Yarn 资源调度框架中,Application Master 会在不同节点协调分配 Container,然后启动 MapTask 和 Reduce Task。真正执行 Map Task 的就是 MapTask 类。打开 hadoop-mapreduce-client-core-2.6.5.jar 包下的 org.apache.hadoop.MapTask.java 类。我们会发现这个类中并没有 main 方法,而在实际运行中,这个类是通过反射实现的。MapTask 中至关重要的方法便是run()方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| ... public void run(JobConf job, TaskUmbilicalProtocol umbilical) throws IOException, ClassNotFoundException, InterruptedException { ... if (this.isMapTask()) { if (this.conf.getNumReduceTasks() == 0) { this.mapPhase = this.getProgress().addPhase("map", 1.0F); } else { this.mapPhase = this.getProgress().addPhase("map", 0.667F); this.sortPhase = this.getProgress().addPhase("sort", 0.333F); } }
... }
...
|
首先看到的是 Map 和 Sort 作业的时间分配。我们发现 MR 框架允许没有 Reduce 作业存在。在这种情况下框架直接输出了 Map 的结果。在有 Reduce 作业的情况下,Map 和 Sort 的时间分配是2 : 1。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| ... public void run(JobConf job, TaskUmbilicalProtocol umbilical) throws IOException, ClassNotFoundException, InterruptedException { ... if (...) { ... } else { if (useNewApi) { this.runNewMapper(job, this.splitMetaInfo, umbilical, reporter); } else { this.runOldMapper(job, this.splitMetaInfo, umbilical, reporter); }
this.done(umbilical, reporter); } }
...
|
RunNewMapper()
获取 Mapper 类
接下来的代码中我们根据 API 版本(是否使用 Yarn)选择调用runNewMapper或者runOldMapper方法。这里重点看runNewMapper。这里面这样的两行代码:
1 2 3 4 5 6 7
| ...
TaskAttemptContext taskContext = new TaskAttemptContextImpl(job, this.getTaskID(), reporter); Mapper<INKEY, INVALUE, OUTKEY, OUTVALUE> mapper = (Mapper)ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
...
|
我们首先声明了一个 taskContext,任务上下文容器对象。下一行中我们通过反射得到了 Mapper 类。打开getMapperClass()方法的实现我们会发现,返回的 Mapper 类由参数MAP_CLASS_ATTR指定。如果没有指定的话则返回一个默认的 Mapper 类。
继续看下一行:
1 2 3 4 5 6
| ...
InputFormat<INKEY, INVALUE> inputFormat = (InputFormat)ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
...
|
这里声明了 InputFormat 类对象 inputFormat,获取的方式和 Mapper 类是类似的。指定 inputFormat 的参数是INPUT_FORMAT_CLASS_ATTR。如果没有指定,返回的是 TextInputFormat。
获取 split
继续往下看:
1 2 3 4 5 6 7
| ...
org.apache.hadoop.mapreduce.InputSplit split = null; split = (org.apache.hadoop.mapreduce.InputSplit)this.getSplitDetails(new Path(splitIndex.getSplitLocation()), splitIndex.getStartOffset());
...
|
切片对象我们在提交作业的过程中已经通过 InputFormat 对象完成了。这里我们获得 split 对象。可以发现有关 split 的关键信息是切片所在的节点、切片的偏移量等。
获取 RecordReader
继续往下看:
1 2 3 4 5 6
| ...
org.apache.hadoop.mapreduce.RecordReader<INKEY, INVALUE> input = new MapTask.NewTrackingRecordReader(split, inputFormat, reporter, taskContext);
...
|
这里我们实例化了一个 RecordReader 对象。我们传递了分片、InputFormat、上下文容器等任务关键的对象。到NewTrackingRecordReader()方法中看一看,会发现里面还有一行实例化 RecordReader 的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| ...
static class NewTrackingRecordReader<K, V> extends org.apache.hadoop.mapreduce.RecordReader<K, V> { private final org.apache.hadoop.mapreduce.RecordReader<K, V> real;
...
this.real = inputFormat.createRecordReader(split, taskContext); ... }
...
|
这里单独将它实例化说明后面会频繁调用。我们点进createRecordReader()方法的实现(TextInputFormat):
1 2 3 4 5 6 7 8 9 10 11 12
| ...
public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) { String delimiter = context.getConfiguration().get("textinputformat.record.delimiter"); byte[] recordDelimiterBytes = null; if (null != delimiter) { recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8); }
return new LineRecordReader(recordDelimiterBytes); }
|
这个方法其实没干什么事情,就是指定了一下分隔符。重点在于返回的LineRecordReader。顾名思义,这是一个按行读取数据的读取器。点开来看一下,发现里面有诸如getCurrentKey()、getCurrentValue()等方法。这些方法在 NewTrackingRecordReader 中也有存在。我们构造了一个 LineRecordReader,将它赋给 NewTrackingRecordReader 中的 real 成员变量,并通过这个变量来读取数据。回过头来看,NewTrackingRecordReader 又被赋给了 MapTask 中的 input 变量。于是我们明白了,MapTask 是通过 RecordReader(具体到 TextInputFormat 则是通过 LineRecordReader)实现 split 数据读取的。
获取 RecordWriter
有了记录读取器 RecordReader,自然也会有记录书写器 RecordWriter。本篇笔记重点关注数据是如何读取的。RecordWriter 的相关内容留到下一篇笔记当中。
获取 Context
继续看:
1 2 3 4 5 6 7 8
| ...
MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE> mapContext = new MapContextImpl(job, this.getTaskID(), input, (RecordWriter)output, this.committer, reporter, split); org.apache.hadoop.mapreduce.Mapper.Context mapperContext = (new WrappedMapper()).getMapContext(mapContext);
...
|
我们实例化了 MapContext 对象。我们提供了 job、当前任务的 ID、读取器、书写器、提交器、报告器、分片等参数。打开 MapContextImpl 我们会发现参数 reader 被赋给了成员变量 reader。于是我们知道了读取器是被封装在 MapContext 容器中的。
而在下一行代码中,我们进一步将 mapContext 封装到了 mapperContext 对象中。接下来的 Map 操作会依赖于这个对象。
初始化读取的数据
继续看:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| ...
try { input.initialize(split, mapperContext); mapper.run(mapperContext); this.mapPhase.complete(); this.setPhase(Phase.SORT); this.statusUpdate(umbilical); input.close(); input = null; ((RecordWriter)output).close(mapperContext); output = null; } finally { this.closeQuietly((org.apache.hadoop.mapreduce.RecordReader)input); this.closeQuietly((RecordWriter)output, mapperContext); }
...
|
我们首先进行了一个初始化操作,对象是 input,即 NewTrackingRecordReader。点开看看里面有什么(LineRecordReader):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| ...
public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException { FileSplit split = (FileSplit)genericSplit; Configuration job = context.getConfiguration(); this.maxLineLength = job.getInt("mapreduce.input.linerecordreader.line.maxlength", 2147483647); this.start = split.getStart(); this.end = this.start + split.getLength(); Path file = split.getPath(); FileSystem fs = file.getFileSystem(job); this.fileIn = fs.open(file); CompressionCodec codec = (new CompressionCodecFactory(job)).getCodec(file); if (null != codec) { ... } else { this.fileIn.seek(this.start); this.in = new UncompressedSplitLineReader(this.fileIn, job, this.recordDelimiterBytes, split.getLength()); this.filePosition = this.fileIn; }
if (this.start != 0L) { this.start += (long)this.in.readLine(new Text(), 0, this.maxBytesToConsume(this.start)); }
this.pos = this.start; }
...
|
我们注意到方法的前半部分在获得了一些关于任务的参数和对象后,我们通过fs.open()方法打开了文件输入流。这个流的seek()方法可以读取指定位置的数据(不需要每次都读取完整的文件)。在这个方法的最后我们检查了起始位置是否为 0。如果不为 0(即不是第一个分片),那么我们会丢弃第一行数据。因为除了最后一个分片以外,我们总是会通过next()方法多读取一行数据(源代码注释里面写的)。这样做可以将原始数据中因为分块被断开的行重新连接起来。
所以初始化的工作是规定了读取的切片和具体的数据。
mapper.run()
紧接着初始化之后的就是mapper.run()方法。终于看到了熟悉的 mapper 对象。至此我们就可以回答最开头的问题:调用 mapper 对象中run()方法的就是 MapTask 当中的这一行代码;mapper 对象中一直要用到的 context 则是在经历了之前的一系列封装后于此时被传入run()方法中的。
小结
- 通过
fs.open(filePath)可以打开文件输入流,输入流的.seek()方法可以帮助我们读取指定位置的数据;
- LineRecordReader 读取 split 时,除了第一个切片以外,其余的切片在读取时都会忽略第一行;除了最后一个切片以外,其余的切片都会多读取一行。这样做是为了将某些在分块时被一分为二的行重新连接起来。