在之前的数篇笔记中我们从零开始完成了 Hadoop 分布式集群的搭建。从这一篇笔记开始,我们要通过 MapReduce 计算框架的学习和实践,将集群转化为生产力。
本篇是系列的第三篇笔记。本篇笔记中我们将会继续分析学习 Mapper 任务启动和数据读取的相关源代码,从而加深对相关流程的理解。
Map Task 源代码分析(一)
Mapper.run()
在第一篇笔记的 WordCount 程序中,我们的 MyMapper 类继承了 Mapper 类。打开 Mapper 类源代码,发现其核心在于run()
方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| ... public void run(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { this.setup(context);
try { while(context.nextKeyValue()) { map(context.getCurrentKey(), context.getCurrentValue(), context); } } finally { cleanup(context); } }
...
|
run()
方法内容很简单。方法中有一个参数context
。我们不断地从中获取键值对进行操作。同时在我们实现的map()
方法中,Map Task 中键值对的输出也依赖于context
。这里就产生了两个问题:第一,Context 对象是何时实例化的;第二,run()
方法是谁来调用的。
MapTask
在 Yarn 资源调度框架中,Application Master 会在不同节点协调分配 Container,然后启动 MapTask 和 Reduce Task。真正执行 Map Task 的就是 MapTask 类。打开 hadoop-mapreduce-client-core-2.6.5.jar 包下的 org.apache.hadoop.MapTask.java 类。我们会发现这个类中并没有 main 方法,而在实际运行中,这个类是通过反射实现的。MapTask 中至关重要的方法便是run()
方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| ... public void run(JobConf job, TaskUmbilicalProtocol umbilical) throws IOException, ClassNotFoundException, InterruptedException { ... if (this.isMapTask()) { if (this.conf.getNumReduceTasks() == 0) { this.mapPhase = this.getProgress().addPhase("map", 1.0F); } else { this.mapPhase = this.getProgress().addPhase("map", 0.667F); this.sortPhase = this.getProgress().addPhase("sort", 0.333F); } }
... }
...
|
首先看到的是 Map 和 Sort 作业的时间分配。我们发现 MR 框架允许没有 Reduce 作业存在。在这种情况下框架直接输出了 Map 的结果。在有 Reduce 作业的情况下,Map 和 Sort 的时间分配是2 : 1。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| ... public void run(JobConf job, TaskUmbilicalProtocol umbilical) throws IOException, ClassNotFoundException, InterruptedException { ... if (...) { ... } else { if (useNewApi) { this.runNewMapper(job, this.splitMetaInfo, umbilical, reporter); } else { this.runOldMapper(job, this.splitMetaInfo, umbilical, reporter); }
this.done(umbilical, reporter); } }
...
|
RunNewMapper()
获取 Mapper 类
接下来的代码中我们根据 API 版本(是否使用 Yarn)选择调用runNewMapper
或者runOldMapper
方法。这里重点看runNewMapper
。这里面这样的两行代码:
1 2 3 4 5 6 7
| ...
TaskAttemptContext taskContext = new TaskAttemptContextImpl(job, this.getTaskID(), reporter); Mapper<INKEY, INVALUE, OUTKEY, OUTVALUE> mapper = (Mapper)ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
...
|
我们首先声明了一个 taskContext,任务上下文容器对象。下一行中我们通过反射得到了 Mapper 类。打开getMapperClass()
方法的实现我们会发现,返回的 Mapper 类由参数MAP_CLASS_ATTR
指定。如果没有指定的话则返回一个默认的 Mapper 类。
继续看下一行:
1 2 3 4 5 6
| ...
InputFormat<INKEY, INVALUE> inputFormat = (InputFormat)ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
...
|
这里声明了 InputFormat 类对象 inputFormat,获取的方式和 Mapper 类是类似的。指定 inputFormat 的参数是INPUT_FORMAT_CLASS_ATTR
。如果没有指定,返回的是 TextInputFormat
。
获取 split
继续往下看:
1 2 3 4 5 6 7
| ...
org.apache.hadoop.mapreduce.InputSplit split = null; split = (org.apache.hadoop.mapreduce.InputSplit)this.getSplitDetails(new Path(splitIndex.getSplitLocation()), splitIndex.getStartOffset());
...
|
切片对象我们在提交作业的过程中已经通过 InputFormat 对象完成了。这里我们获得 split 对象。可以发现有关 split 的关键信息是切片所在的节点、切片的偏移量等。
获取 RecordReader
继续往下看:
1 2 3 4 5 6
| ...
org.apache.hadoop.mapreduce.RecordReader<INKEY, INVALUE> input = new MapTask.NewTrackingRecordReader(split, inputFormat, reporter, taskContext);
...
|
这里我们实例化了一个 RecordReader 对象。我们传递了分片、InputFormat、上下文容器等任务关键的对象。到NewTrackingRecordReader()
方法中看一看,会发现里面还有一行实例化 RecordReader 的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| ...
static class NewTrackingRecordReader<K, V> extends org.apache.hadoop.mapreduce.RecordReader<K, V> { private final org.apache.hadoop.mapreduce.RecordReader<K, V> real;
...
this.real = inputFormat.createRecordReader(split, taskContext); ... }
...
|
这里单独将它实例化说明后面会频繁调用。我们点进createRecordReader()
方法的实现(TextInputFormat):
1 2 3 4 5 6 7 8 9 10 11 12
| ...
public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) { String delimiter = context.getConfiguration().get("textinputformat.record.delimiter"); byte[] recordDelimiterBytes = null; if (null != delimiter) { recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8); }
return new LineRecordReader(recordDelimiterBytes); }
|
这个方法其实没干什么事情,就是指定了一下分隔符。重点在于返回的LineRecordReader
。顾名思义,这是一个按行读取数据的读取器。点开来看一下,发现里面有诸如getCurrentKey()
、getCurrentValue()
等方法。这些方法在 NewTrackingRecordReader 中也有存在。我们构造了一个 LineRecordReader,将它赋给 NewTrackingRecordReader 中的 real 成员变量,并通过这个变量来读取数据。回过头来看,NewTrackingRecordReader 又被赋给了 MapTask 中的 input 变量。于是我们明白了,MapTask 是通过 RecordReader(具体到 TextInputFormat 则是通过 LineRecordReader)实现 split 数据读取的。
获取 RecordWriter
有了记录读取器 RecordReader,自然也会有记录书写器 RecordWriter。本篇笔记重点关注数据是如何读取的。RecordWriter 的相关内容留到下一篇笔记当中。
获取 Context
继续看:
1 2 3 4 5 6 7 8
| ...
MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE> mapContext = new MapContextImpl(job, this.getTaskID(), input, (RecordWriter)output, this.committer, reporter, split); org.apache.hadoop.mapreduce.Mapper.Context mapperContext = (new WrappedMapper()).getMapContext(mapContext);
...
|
我们实例化了 MapContext 对象。我们提供了 job、当前任务的 ID、读取器、书写器、提交器、报告器、分片等参数。打开 MapContextImpl 我们会发现参数 reader 被赋给了成员变量 reader。于是我们知道了读取器是被封装在 MapContext 容器中的。
而在下一行代码中,我们进一步将 mapContext 封装到了 mapperContext 对象中。接下来的 Map 操作会依赖于这个对象。
初始化读取的数据
继续看:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| ...
try { input.initialize(split, mapperContext); mapper.run(mapperContext); this.mapPhase.complete(); this.setPhase(Phase.SORT); this.statusUpdate(umbilical); input.close(); input = null; ((RecordWriter)output).close(mapperContext); output = null; } finally { this.closeQuietly((org.apache.hadoop.mapreduce.RecordReader)input); this.closeQuietly((RecordWriter)output, mapperContext); }
...
|
我们首先进行了一个初始化操作,对象是 input,即 NewTrackingRecordReader。点开看看里面有什么(LineRecordReader):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| ...
public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException { FileSplit split = (FileSplit)genericSplit; Configuration job = context.getConfiguration(); this.maxLineLength = job.getInt("mapreduce.input.linerecordreader.line.maxlength", 2147483647); this.start = split.getStart(); this.end = this.start + split.getLength(); Path file = split.getPath(); FileSystem fs = file.getFileSystem(job); this.fileIn = fs.open(file); CompressionCodec codec = (new CompressionCodecFactory(job)).getCodec(file); if (null != codec) { ... } else { this.fileIn.seek(this.start); this.in = new UncompressedSplitLineReader(this.fileIn, job, this.recordDelimiterBytes, split.getLength()); this.filePosition = this.fileIn; }
if (this.start != 0L) { this.start += (long)this.in.readLine(new Text(), 0, this.maxBytesToConsume(this.start)); }
this.pos = this.start; }
...
|
我们注意到方法的前半部分在获得了一些关于任务的参数和对象后,我们通过fs.open()
方法打开了文件输入流。这个流的seek()
方法可以读取指定位置的数据(不需要每次都读取完整的文件)。在这个方法的最后我们检查了起始位置是否为 0。如果不为 0(即不是第一个分片),那么我们会丢弃第一行数据。因为除了最后一个分片以外,我们总是会通过next()
方法多读取一行数据(源代码注释里面写的)。这样做可以将原始数据中因为分块被断开的行重新连接起来。
所以初始化的工作是规定了读取的切片和具体的数据。
mapper.run()
紧接着初始化之后的就是mapper.run()
方法。终于看到了熟悉的 mapper 对象。至此我们就可以回答最开头的问题:调用 mapper 对象中run()
方法的就是 MapTask 当中的这一行代码;mapper 对象中一直要用到的 context 则是在经历了之前的一系列封装后于此时被传入run()
方法中的。
小结
- 通过
fs.open(filePath)
可以打开文件输入流,输入流的.seek()
方法可以帮助我们读取指定位置的数据;
- LineRecordReader 读取 split 时,除了第一个切片以外,其余的切片在读取时都会忽略第一行;除了最后一个切片以外,其余的切片都会多读取一行。这样做是为了将某些在分块时被一分为二的行重新连接起来。