0%

MapReduce 编程和源代码学习(三)

在之前的数篇笔记中我们从零开始完成了 Hadoop 分布式集群的搭建。从这一篇笔记开始,我们要通过 MapReduce 计算框架的学习和实践,将集群转化为生产力。

本篇是系列的第三篇笔记。本篇笔记中我们将会继续分析学习 Mapper 任务启动和数据读取的相关源代码,从而加深对相关流程的理解。

Map Task 源代码分析(一)


Mapper.run()

在第一篇笔记的 WordCount 程序中,我们的 MyMapper 类继承了 Mapper 类。打开 Mapper 类源代码,发现其核心在于run()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// Mapper.java
...

public void run(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
this.setup(context);

try {
while(context.nextKeyValue()) {
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
} finally {
cleanup(context);
}
}

...

run()方法内容很简单。方法中有一个参数context。我们不断地从中获取键值对进行操作。同时在我们实现的map()方法中,Map Task 中键值对的输出也依赖于context。这里就产生了两个问题:第一,Context 对象是何时实例化的;第二,run()方法是谁来调用的。


MapTask

在 Yarn 资源调度框架中,Application Master 会在不同节点协调分配 Container,然后启动 MapTask 和 Reduce Task。真正执行 Map Task 的就是 MapTask 类。打开 hadoop-mapreduce-client-core-2.6.5.jar 包下的 org.apache.hadoop.MapTask.java 类。我们会发现这个类中并没有 main 方法,而在实际运行中,这个类是通过反射实现的。MapTask 中至关重要的方法便是run()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// MapTask.java, decompiled .java file
...

public void run(JobConf job, TaskUmbilicalProtocol umbilical) throws IOException, ClassNotFoundException, InterruptedException {

...

if (this.isMapTask()) {
if (this.conf.getNumReduceTasks() == 0) {
this.mapPhase = this.getProgress().addPhase("map", 1.0F);
} else {
this.mapPhase = this.getProgress().addPhase("map", 0.667F);
this.sortPhase = this.getProgress().addPhase("sort", 0.333F);
}
}

...
}

...

首先看到的是 Map 和 Sort 作业的时间分配。我们发现 MR 框架允许没有 Reduce 作业存在。在这种情况下框架直接输出了 Map 的结果。在有 Reduce 作业的情况下,Map 和 Sort 的时间分配是2 : 1。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// MapTask.java, decompiled .java file
...

public void run(JobConf job, TaskUmbilicalProtocol umbilical) throws IOException, ClassNotFoundException, InterruptedException {

...

if (...) {

...

} else {
if (useNewApi) {
this.runNewMapper(job, this.splitMetaInfo, umbilical, reporter);
} else {
this.runOldMapper(job, this.splitMetaInfo, umbilical, reporter);
}

this.done(umbilical, reporter);
}
}

...

RunNewMapper()


获取 Mapper 类

接下来的代码中我们根据 API 版本(是否使用 Yarn)选择调用runNewMapper或者runOldMapper方法。这里重点看runNewMapper。这里面这样的两行代码:

1
2
3
4
5
6
7
// MapTask.java, decompiled .java file
...

TaskAttemptContext taskContext = new TaskAttemptContextImpl(job, this.getTaskID(), reporter);
Mapper<INKEY, INVALUE, OUTKEY, OUTVALUE> mapper = (Mapper)ReflectionUtils.newInstance(taskContext.getMapperClass(), job);

...

我们首先声明了一个 taskContext,任务上下文容器对象。下一行中我们通过反射得到了 Mapper 类。打开getMapperClass()方法的实现我们会发现,返回的 Mapper 类由参数MAP_CLASS_ATTR指定。如果没有指定的话则返回一个默认的 Mapper 类。

获取 InputFormat 类

继续看下一行:

1
2
3
4
5
6
// MapTask.java, decompiled .java file
...

InputFormat<INKEY, INVALUE> inputFormat = (InputFormat)ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);

...

这里声明了 InputFormat 类对象 inputFormat,获取的方式和 Mapper 类是类似的。指定 inputFormat 的参数是INPUT_FORMAT_CLASS_ATTR。如果没有指定,返回的是 TextInputFormat

获取 split

继续往下看:

1
2
3
4
5
6
7
// MapTask.java, decompiled .java file
...

org.apache.hadoop.mapreduce.InputSplit split = null;
split = (org.apache.hadoop.mapreduce.InputSplit)this.getSplitDetails(new Path(splitIndex.getSplitLocation()), splitIndex.getStartOffset());

...

切片对象我们在提交作业的过程中已经通过 InputFormat 对象完成了。这里我们获得 split 对象。可以发现有关 split 的关键信息是切片所在的节点、切片的偏移量等。

获取 RecordReader

继续往下看:

1
2
3
4
5
6
// MapTask.java, decompiled .java file
...

org.apache.hadoop.mapreduce.RecordReader<INKEY, INVALUE> input = new MapTask.NewTrackingRecordReader(split, inputFormat, reporter, taskContext);

...

这里我们实例化了一个 RecordReader 对象。我们传递了分片、InputFormat、上下文容器等任务关键的对象。到NewTrackingRecordReader()方法中看一看,会发现里面还有一行实例化 RecordReader 的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// MapTask.java, decompiled .java file
...

static class NewTrackingRecordReader<K, V> extends org.apache.hadoop.mapreduce.RecordReader<K, V> {
private final org.apache.hadoop.mapreduce.RecordReader<K, V> real;

...

this.real = inputFormat.createRecordReader(split, taskContext);

...
}

...

这里单独将它实例化说明后面会频繁调用。我们点进createRecordReader()方法的实现(TextInputFormat):

1
2
3
4
5
6
7
8
9
10
11
12
// TextInputFormat.java, decompiled .java file
...

public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) {
String delimiter = context.getConfiguration().get("textinputformat.record.delimiter");
byte[] recordDelimiterBytes = null;
if (null != delimiter) {
recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
}

return new LineRecordReader(recordDelimiterBytes);
}

这个方法其实没干什么事情,就是指定了一下分隔符。重点在于返回的LineRecordReader。顾名思义,这是一个按行读取数据的读取器。点开来看一下,发现里面有诸如getCurrentKey()getCurrentValue()等方法。这些方法在 NewTrackingRecordReader 中也有存在。我们构造了一个 LineRecordReader,将它赋给 NewTrackingRecordReader 中的 real 成员变量,并通过这个变量来读取数据。回过头来看,NewTrackingRecordReader 又被赋给了 MapTask 中的 input 变量。于是我们明白了,MapTask 是通过 RecordReader(具体到 TextInputFormat 则是通过 LineRecordReader)实现 split 数据读取的。

获取 RecordWriter

有了记录读取器 RecordReader,自然也会有记录书写器 RecordWriter。本篇笔记重点关注数据是如何读取的。RecordWriter 的相关内容留到下一篇笔记当中。

获取 Context

继续看:

1
2
3
4
5
6
7
8
// MapTask.java, decompiled .java file
...

MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE> mapContext = new MapContextImpl(job, this.getTaskID(), input, (RecordWriter)output, this.committer, reporter, split);

org.apache.hadoop.mapreduce.Mapper.Context mapperContext = (new WrappedMapper()).getMapContext(mapContext);

...

我们实例化了 MapContext 对象。我们提供了 job、当前任务的 ID、读取器、书写器、提交器、报告器、分片等参数。打开 MapContextImpl 我们会发现参数 reader 被赋给了成员变量 reader。于是我们知道了读取器是被封装在 MapContext 容器中的。

而在下一行代码中,我们进一步将 mapContext 封装到了 mapperContext 对象中。接下来的 Map 操作会依赖于这个对象。

初始化读取的数据

继续看:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// MapTask.java, decompiled .java file
...

try {
input.initialize(split, mapperContext);
mapper.run(mapperContext);
this.mapPhase.complete();
this.setPhase(Phase.SORT);
this.statusUpdate(umbilical);
input.close();
input = null;
((RecordWriter)output).close(mapperContext);
output = null;
} finally {
this.closeQuietly((org.apache.hadoop.mapreduce.RecordReader)input);
this.closeQuietly((RecordWriter)output, mapperContext);
}

...

我们首先进行了一个初始化操作,对象是 input,即 NewTrackingRecordReader。点开看看里面有什么(LineRecordReader):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// LineRecordReader.java, decompiled .java file
...

public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException {
FileSplit split = (FileSplit)genericSplit;
Configuration job = context.getConfiguration();
this.maxLineLength = job.getInt("mapreduce.input.linerecordreader.line.maxlength", 2147483647);
this.start = split.getStart();
this.end = this.start + split.getLength();
Path file = split.getPath();
FileSystem fs = file.getFileSystem(job);
this.fileIn = fs.open(file);
CompressionCodec codec = (new CompressionCodecFactory(job)).getCodec(file);
if (null != codec) {

...

} else {
this.fileIn.seek(this.start);
this.in = new UncompressedSplitLineReader(this.fileIn, job, this.recordDelimiterBytes, split.getLength());
this.filePosition = this.fileIn;
}

if (this.start != 0L) {
this.start += (long)this.in.readLine(new Text(), 0, this.maxBytesToConsume(this.start));
}

this.pos = this.start;
}

...

我们注意到方法的前半部分在获得了一些关于任务的参数和对象后,我们通过fs.open()方法打开了文件输入流。这个流的seek()方法可以读取指定位置的数据(不需要每次都读取完整的文件)。在这个方法的最后我们检查了起始位置是否为 0。如果不为 0(即不是第一个分片),那么我们会丢弃第一行数据。因为除了最后一个分片以外,我们总是会通过next()方法多读取一行数据(源代码注释里面写的)。这样做可以将原始数据中因为分块被断开的行重新连接起来。

所以初始化的工作是规定了读取的切片和具体的数据。

mapper.run()

紧接着初始化之后的就是mapper.run()方法。终于看到了熟悉的 mapper 对象。至此我们就可以回答最开头的问题:调用 mapper 对象中run()方法的就是 MapTask 当中的这一行代码;mapper 对象中一直要用到的 context 则是在经历了之前的一系列封装后于此时被传入run()方法中的。


小结

  • 通过fs.open(filePath)可以打开文件输入流,输入流的.seek()方法可以帮助我们读取指定位置的数据;
  • LineRecordReader 读取 split 时,除了第一个切片以外,其余的切片在读取时都会忽略第一行;除了最后一个切片以外,其余的切片都会多读取一行。这样做是为了将某些在分块时被一分为二的行重新连接起来。