0%

MapReduce 编程和源代码学习(二)

在之前的数篇笔记中我们从零开始完成了 Hadoop 分布式集群的搭建。从这一篇笔记开始,我们要通过 MapReduce 计算框架的学习和实践,将集群转化为生产力。

本篇是系列的第二篇笔记。本篇笔记中我们将会分析学习客户端作业提交相关源码,从而加深对相关流程的理解。

MapReduce 客户端作业提交源码分析

回忆上一篇笔记中我们手写的 WordCount 程序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// MyWordCount.java
public class MyWordCount {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

Configuration conf = new Configuration();
Job job = Job.getInstance(conf);

...

Path inPath = new Path(args[0]);
FileInputFormat.addInputPath(job, inPath);

...

job.setMapperClass(MyMapper.class);

...

job.setReducerClass(MyReducer.class);

System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

我们定义了任务的输入输出路径,指定了我们自己实现 Mapper 和 Reducer 类。这次我们重点关注,当我们将计算的细节定义完成后,任务提交的细节是怎样的。

作业参数

首先打开 Job 类定义(Mac 版的 Intellij 中只需要按住 Command 键再点击类的名称即可):

1
2
3
4
5
6
7
8
// Job.java
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class Job extends JobContextImpl implements JobContext {

...

}

Job 类实现了 JobContext 接口,而 JobContext 继承自 MRJobConfig 接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// MRJobConfig.java
@InterfaceAudience.Private
@InterfaceStability.Evolving
public interface MRJobConfig {

// Put all of the attribute names in here so that Job and JobContext are
// consistent.
public static final String INPUT_FORMAT_CLASS_ATTR = "mapreduce.job.inputformat.class";

public static final String MAP_CLASS_ATTR = "mapreduce.job.map.class";

public static final String MAP_OUTPUT_COLLECTOR_CLASS_ATTR
= "mapreduce.job.map.output.collector.class";
...

}

MRJobConfig 中定义了大量的常量,而这些常量就是我们在定义计算任务时需要修改的参数。例如:

1
public static final String REDUCE_CLASS_ATTR = "mapreduce.job.reduce.class";

定义了默认的 Reduce 类。再比如:

1
public static final int DEFAULT_MAP_MEMORY_MB = 1024;

定义了默认的 Map 作业内存大小。由此我们可知,Job 类通过继承 JobContext 和 MRJobConfig 获得了作业相关参数配置。我们也知道了,Job 类中的一系列 set、add 方法,本质就是在修改这些参数的值。

作业提交

MyWordCount 中我们使用了 waitForCompletion 方法来提交任务。现在我们来看看这个方法具体都有哪些细节:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// Job.java
...

public boolean waitForCompletion(boolean verbose
) throws IOException, InterruptedException,
ClassNotFoundException {
if (state == JobState.DEFINE) {
submit();
}
if (verbose) {
monitorAndPrintJob();
} else {
// get the completion poll interval from the client.
int completionPollIntervalMillis =
Job.getCompletionPollInterval(cluster.getConf());
while (!isComplete()) {
try {
Thread.sleep(completionPollIntervalMillis);
} catch (InterruptedException ie) {
}
}
}
return isSuccessful();
}

...

verbose 参数很简单,作业运行时是否输出信息。这里真正重要的内容是submit()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// Job.java
/**
* Submit the job to the cluster and return immediately.
* @throws IOException
*/
public void submit()
throws IOException, InterruptedException, ClassNotFoundException {
ensureState(JobState.DEFINE);
setUseNewAPI();
connect();
final JobSubmitter submitter =
getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
public JobStatus run() throws IOException, InterruptedException,
ClassNotFoundException {
return submitter.submitJobInternal(Job.this, cluster);
}
});
state = JobState.RUNNING;
LOG.info("The url to track the job: " + getTrackingURL());
}

ensureState会检查作业状态是否为 DEFINE(另一种作业状态是 RUNNING),同时也会检查作业是否与 JobTracker 相关联。setUseNewAPI()顾名思义,使用新的 Mapper 和 Reducer API。connect()检查并指定集群信息。再往下看会发现作业提交的重点在于submitter.submitJobInternal()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// JobSubmitter.java
...

/**
* Internal method for submitting jobs to the system.
*
* <p>The job submission process involves:
* <ol>
* <li>
* Checking the input and output specifications of the job.
* </li>
* <li>
* Computing the {@link InputSplit}s for the job.
* </li>
* <li>
* Setup the requisite accounting information for the
* {@link DistributedCache} of the job, if necessary.
* </li>
* <li>
* Copying the job's jar and configuration to the map-reduce system
* directory on the distributed file-system.
* </li>
* <li>
* Submitting the job to the <code>JobTracker</code> and optionally
* monitoring it's status.
* </li>
* </ol></p>
* @param job the configuration to submit
* @param cluster the handle to the Cluster
* @throws ClassNotFoundException
* @throws InterruptedException
* @throws IOException
*/
JobStatus submitJobInternal(Job job, Cluster cluster)
throws ClassNotFoundException, InterruptedException, IOException {

...

}

...

这个方法的注释告诉了我们作业提交的步骤:

  1. Checking the input and output specifications of the job.
  2. Computing the InputSplits for the job.
  3. Setup the requisite accounting information for the DistributedCache of the job, if necessary.
  4. Copying the job’s jar and configuration to the map-reduce system directory on the distributed file-system.
  5. Submitting the job to the JobTracker and optionally monitoring it’s status.

获取 split 列表,计算 split 切片数量、写 split

在 JobSubmitter 源代码 197 行有:

1
2
3
4
5
...

int maps = writeSplits(job, submitJobDir);

...

这里通过计算 split 的数量得到了 map 的数量(map 数量和 split 数量是相等的)。点开writeSplits()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// JobSubmitter.java
...

private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,
Path jobSubmitDir) throws IOException,
InterruptedException, ClassNotFoundException {
JobConf jConf = (JobConf)job.getConfiguration();
int maps;
if (jConf.getUseNewMapper()) {
maps = writeNewSplits(job, jobSubmitDir);
} else {
maps = writeOldSplits(jConf, jobSubmitDir);
}
return maps;
}

...

可以看到 API 的不同切分 split 的方法也不同。这里我们点开新的切片方法writeNewSplits

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// JobSubmitter.java
...

@SuppressWarnings("unchecked")
private <T extends InputSplit>
int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
InterruptedException, ClassNotFoundException {
Configuration conf = job.getConfiguration();
InputFormat<?, ?> input =
ReflectionUtils.newInstance(job.getInputFormatClass(), conf);

List<InputSplit> splits = input.getSplits(job);
T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);

// sort the splits into order based on size, so that the biggest
// go first
Arrays.sort(array, new SplitComparator());
JobSplitWriter.createSplitFiles(jobSubmitDir, conf,
jobSubmitDir.getFileSystem(conf), array);
return array.length;
}

...

我们发现返回的切片数量其实是某个数组的长度,而这个数组来源于切片对象类数组List<InputSplit> splits = input.getSplits(job);。我们现在想知道系统到底是如何获得切片的。使用的方法是input.getSplits()。这里的 input 是使用反射生成的InputFormat的实现类,于是我们点开job.getInputFormatClass()方法:

注意 job 的类型是 JobContext,是一个抽象类。我们这里要打开的应该是它的实现 JobContextImpl

1
2
3
4
5
6
7
8
9
10
// JobContextImpl.java
...

public Class<? extends InputFormat<?,?>> getInputFormatClass()
throws ClassNotFoundException {
return (Class<? extends InputFormat<?,?>>)
conf.getClass(INPUT_FORMAT_CLASS_ATTR, TextInputFormat.class);
}

...

通过查看getClass()方法我们发现这里返回的实际就是TextInputFormat对象(我们没有定义INPUT_FORMAT_CLASS_ATTR)。那么问题来了,什么是 InputFormat 呢?根据代码中的注释,InputFormat 描述了输入数据的规格。MR 框架依赖该描述来:

  1. 验证作业的输入格式
  2. 将数据文件拆分成逻辑分片(InputSplit),并为每一个分片分配一个 Mapper
  3. 创建用于搜集输入数据给 Mapper 的RecordReader

简单来说,有了 InputFormat,MR 框架才知道应该如何拆分数据,形成逻辑切片。我们继续看 TextInputFormat 的getSplits()方法。发现在 TextInputFormat 类中找不到该方法,那么该方法一定是在它的父类中。于是我们在 FileInputFormat 类中找到该方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// FileInputFormat.java
...

public List<InputSplit> getSplits(JobContext job) throws IOException {
Stopwatch sw = new Stopwatch().start();
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
long maxSize = getMaxSplitSize(job);

// generate splits
List<InputSplit> splits = new ArrayList<InputSplit>();
List<FileStatus> files = listStatus(job);

...
}

...

先看这一小段。首先我们通过 minSize 和 maxSize 规定了切片大小的边界。进一步点开看的话还能发现,默认最小值为 1(字节),最大值为Long.MAX_VALUE。我们还知道了可以通过定义SPLIT_MINSIZESPLIT_MAXSIZE来改变切片大小范围。

然后我们创建了一个空容器splits用来装切片对象。我们用listStatus()方法创建了 FileStatus 列表。FileStatus 对象其实就是封装了文件和目录元数据的对象。于是我们在这里得到了作业相关的文件信息列表。继续往下看:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// FileInputFormat.java
...

for (FileStatus file: files) {
Path path = file.getPath();
long length = file.getLen();
if (length != 0) {

...

} else {
//Create empty hosts array for zero length files
splits.add(makeSplit(path, 0, length, new String[0]));
}
}

...

可以看到我们遍历了文件列表,得到文件的路径和长度。接下来会进行一个判断,判断文件长度是否为 0。如果为 0,就创建一个空列表来表示它。

继续看,如果文件长度不为 0 会怎样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// FileInputFormat.java
...

if (length != 0) {
BlockLocation[] blkLocations;
if (file instanceof LocatedFileStatus) {
blkLocations = ((LocatedFileStatus) file).getBlockLocations();
} else {
FileSystem fs = path.getFileSystem(job.getConfiguration());
blkLocations = fs.getFileBlockLocations(file, 0, length);
}
if (isSplitable(job, path)) {
long blockSize = file.getBlockSize();
long splitSize = computeSplitSize(blockSize, minSize, maxSize);

long bytesRemaining = length;
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
bytesRemaining -= splitSize;
}

if (bytesRemaining != 0) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
}
} else { // not splitable
splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
blkLocations[0].getCachedHosts()));
}
}

...

首先判断文件是否是 LocatedFileStatus,即是否来源于本地文件系统(区别于 HDFS 文件系统)。是和不是都有对应的获取 Block 位置的方法。其中读取 HDFS 上 Block 位置信息的方法值得记一记,其它地方有时也会用上。

接下来会判断文件是否可以切分。如果可以切分,通过 blockSize、minSize 和 maxSize 计算出切片的大小。默认切片大小等于 Block 大小。接下来重点看切分的过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// FileInputFormat.java
...

long bytesRemaining = length;
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
bytesRemaining -= splitSize;
}

if (bytesRemaining != 0) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
}

...

首先定义了一个变量 bytesRemaining 等于文件的长度。循环条件中的 SPLIT_SLOP 值为 1.1。在循环体内,我们首先将 Block 位置信息和偏移量作为参数传递给getBlockIndex()方法获得 Block 的索引值。之后通过makeSplit()方法创建切片。makeSplit()方法需要传递的参数包括文件路径、起始位置(类似于偏移量)、需要处理的文件长度、保存有 Block 的主机列表、在内存中储存有 Block 的主机列表。这些信息将用于 Split 相关的计算。

到这一步我们就已经获得了 splits 列表了。根据这个列表的大小我们一路追溯,知道了需要多少个 Mapper。同时在writeNewSplits()方法的最后,我们也将 splits 文件写到了磁盘当中(点开来可以发现一个文件输出流)。


小结

这篇笔记中我们重点分析的其实是作业提交过程中 Mapper 数量的取得过程。由此引申出 Split 切片的获得过程。另外我们也了解了和作业相关的一些参数配置在哪里(JobContext、MRJobConfig)。知道了这一点,配置参数就更加容易了。值得注意的是 InputFormat 类。在这部分的源码中我们通过该类实现了数据的逻辑切片。在后面的源码分析中还会多次见到 InputFormat 类。另外我们现在得到的 splits 只是一个逻辑的切片,其中并不包含真正的数据。