0%

MapReduce 编程和源代码学习(五)

在之前的数篇笔记中我们从零开始完成了 Hadoop 分布式集群的搭建。从这一篇笔记开始,我们要通过 MapReduce 计算框架的学习和实践,将集群转化为生产力。

本篇是系列的第五篇笔记。本篇笔记中我们将会继续分析学习 Mapper 数据输出的相关源代码,从而加深对相关流程的理解。

Map Task 源代码分析(二)


上上篇笔记中我们跳过了 RecordWriter 相关的内容。这回我们继续从 MapTask.class 开始看起:

1
2
3
4
5
6
7
8
9
10
11
// MapTask.class, decompiled .java file
...

RecordWriter output = null;
if (job.getNumReduceTasks() == 0) {
output = new MapTask.NewDirectOutputCollector(taskContext, job, umbilical, reporter);
} else {
output = new MapTask.NewOutputCollector(taskContext, job, umbilical, reporter);
}

...

NewOutputCollector

我们这里重点看 NewOutputCollector。打开它:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// MapTask.class, decompiled .java file
...

private class NewOutputCollector<K, V> extends RecordWriter<K, V> {
private final MapOutputCollector<K, V> collector;

...

NewOutputCollector(JobContext jobContext, JobConf job, TaskUmbilicalProtocol umbilical, TaskReporter reporter) throws IOException, ClassNotFoundException {
this.collector = MapTask.this.createSortingCollector(job, reporter);

...

}
}
}

...

可以看到我们定义了一个成员变量 MapOutputCollector,并且通过createSortingCollector()方法实例化了它。根据我们对 MR 框架的理解我们有理由猜测,createSortingCollector()得到的是一个兼具排序功能的、向内存缓冲区中书写数据的收集器。

MapOutputCollector

点开来看一看:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// MapTask.class, decompiled .java file
...

private <KEY, VALUE> MapOutputCollector<KEY, VALUE> createSortingCollector(JobConf job, TaskReporter reporter) throws IOException, ClassNotFoundException {
Context context = new Context(this, job, reporter);
Class<?>[] collectorClasses = job.getClasses("mapreduce.job.map.output.collector.class", new Class[]{MapTask.MapOutputBuffer.class});
int remainingCollectors = collectorClasses.length;
Class[] arr$ = collectorClasses;
int len$ = collectorClasses.length;
int i$ = 0;

while(i$ < len$) {
Class clazz = arr$[i$];

try {
if (!MapOutputCollector.class.isAssignableFrom(clazz)) {
throw new IOException("Invalid output collector class: " + clazz.getName() + " (does not implement MapOutputCollector)");
}

Class<? extends MapOutputCollector> subclazz = clazz.asSubclass(MapOutputCollector.class);
LOG.debug("Trying map output collector class: " + subclazz.getName());
MapOutputCollector<KEY, VALUE> collector = (MapOutputCollector)ReflectionUtils.newInstance(subclazz, job);
collector.init(context);
LOG.info("Map output collector class = " + collector.getClass().getName());
return collector;
} catch (Exception var12) {
String msg = "Unable to initialize MapOutputCollector " + clazz.getName();
--remainingCollectors;
if (remainingCollectors > 0) {
msg = msg + " (" + remainingCollectors + " more collector(s) to try)";
}

LOG.warn(msg, var12);
++i$;
}
}

throw new IOException("Unable to initialize any output collector");
}

...

长长一段,核心只有一个,那就是 MapOutputBuffer 类。如果我们不定义mapreduce.job.map.output.collector.class的话,MapOutputBuffer 就是默认的数据收集器。从它的名字可以看出它收集的数据被写入到了缓冲区中。自定义 Collector 十分复杂,这里不作详细研究。MapOutputBuffer 通过collector.init(context)实例化,接下来我们将观察初始化的过程。

MapOutputBuffer 初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
public void init(Context context) throws IOException, ClassNotFoundException {
this.job = context.getJobConf();
this.reporter = context.getReporter();
this.mapTask = context.getMapTask();
this.mapOutputFile = this.mapTask.getMapOutputFile();
this.sortPhase = this.mapTask.getSortPhase();
this.spilledRecordsCounter = this.reporter.getCounter(TaskCounter.SPILLED_RECORDS);
this.partitions = this.job.getNumReduceTasks();
this.rfs = FileSystem.getLocal(this.job).getRaw();
float spillper = this.job.getFloat("mapreduce.map.sort.spill.percent", 0.8F);
int sortmb = this.job.getInt("mapreduce.task.io.sort.mb", 100);
this.indexCacheMemoryLimit = this.job.getInt("mapreduce.task.index.cache.limit.bytes", 1048576);
if (spillper <= 1.0F && spillper > 0.0F) {
if ((sortmb & 2047) != sortmb) {
throw new IOException("Invalid \"mapreduce.task.io.sort.mb\": " + sortmb);
} else {
this.sorter = (IndexedSorter)ReflectionUtils.newInstance(this.job.getClass("map.sort.class", QuickSort.class, IndexedSorter.class), this.job);
int maxMemUsage = sortmb << 20;
maxMemUsage -= maxMemUsage % 16;
this.kvbuffer = new byte[maxMemUsage];
this.bufvoid = this.kvbuffer.length;
this.kvmeta = ByteBuffer.wrap(this.kvbuffer).order(ByteOrder.nativeOrder()).asIntBuffer();
this.setEquator(0);
this.bufstart = this.bufend = this.bufindex = this.equator;
this.kvstart = this.kvend = this.kvindex;
this.maxRec = this.kvmeta.capacity() / 4;
this.softLimit = (int)((float)this.kvbuffer.length * spillper);
this.bufferRemaining = this.softLimit;
MapTask.LOG.info("mapreduce.task.io.sort.mb: " + sortmb);
MapTask.LOG.info("soft limit at " + this.softLimit);
MapTask.LOG.info("bufstart = " + this.bufstart + "; bufvoid = " + this.bufvoid);
MapTask.LOG.info("kvstart = " + this.kvstart + "; length = " + this.maxRec);
this.comparator = this.job.getOutputKeyComparator();
this.keyClass = this.job.getMapOutputKeyClass();
this.valClass = this.job.getMapOutputValueClass();
this.serializationFactory = new SerializationFactory(this.job);
this.keySerializer = this.serializationFactory.getSerializer(this.keyClass);
this.keySerializer.open(this.bb);
this.valSerializer = this.serializationFactory.getSerializer(this.valClass);
this.valSerializer.open(this.bb);
this.mapOutputByteCounter = this.reporter.getCounter(TaskCounter.MAP_OUTPUT_BYTES);
this.mapOutputRecordCounter = this.reporter.getCounter(TaskCounter.MAP_OUTPUT_RECORDS);
this.fileOutputByteCounter = this.reporter.getCounter(TaskCounter.MAP_OUTPUT_MATERIALIZED_BYTES);
if (this.job.getCompressMapOutput()) {
Class<? extends CompressionCodec> codecClass = this.job.getMapOutputCompressorClass(DefaultCodec.class);
this.codec = (CompressionCodec)ReflectionUtils.newInstance(codecClass, this.job);
} else {
this.codec = null;
}

Counter combineInputCounter = this.reporter.getCounter(TaskCounter.COMBINE_INPUT_RECORDS);
this.combinerRunner = CombinerRunner.create(this.job, this.getTaskID(), combineInputCounter, this.reporter, (OutputCommitter)null);
if (this.combinerRunner != null) {
Counter combineOutputCounter = this.reporter.getCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);
this.combineCollector = new CombineOutputCollector(combineOutputCounter, this.reporter, this.job);
} else {
this.combineCollector = null;
}

this.spillInProgress = false;
this.minSpillsForCombine = this.job.getInt("mapreduce.map.combine.minspills", 3);
this.spillThread.setDaemon(true);
this.spillThread.setName("SpillThread");
this.spillLock.lock();

try {
this.spillThread.start();

while(!this.spillThreadRunning) {
this.spillDone.await();
}
} catch (InterruptedException var10) {
throw new IOException("Spill thread failed to initialize", var10);
} finally {
this.spillLock.unlock();
}

if (this.sortSpillException != null) {
throw new IOException("Spill thread failed to initialize", this.sortSpillException);
}
}
} else {
throw new IOException("Invalid \"mapreduce.map.sort.spill.percent\": " + spillper);
}
}

长长的。简单说,缓冲区默认大小为 100 MB,且默认写到 80% 后就开始向磁盘写数据。Sorter 使用 QuickSort 算法。

Partition 分区个数与分区器

回到 NewOutputCollector,得到 collector 后再下面一行我们得到了分区的个数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// MapTask.class, decompiled .java file

private class NewOutputCollector<K, V> extends RecordWriter<K, V> {
private final MapOutputCollector<K, V> collector;

...

NewOutputCollector(JobContext jobContext, JobConf job, TaskUmbilicalProtocol umbilical, TaskReporter reporter) throws IOException, ClassNotFoundException {
this.partitions = jobContext.getNumReduceTasks();
if (this.partitions > 1) {
this.partitioner = (Partitioner)ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);
} else {
this.partitioner = new Partitioner<K, V>() {
public int getPartition(K key, V value, int numPartitions) {
return NewOutputCollector.this.partitions - 1;
}
};
}

...

}
}
}

从方法的名称getNumReduceTasks()可以看出分区的个数就是 Reducer 的个数。如果分区数大于 1,那么我们就可以自行定义 PartitionClass(默认使用的是哈希分区器);而如果分区数为 1,从下面的匿名内部类可以看出,分区号直接被设定为了 0。

小结

  • 可以通过继承 Partitioner 类自行定义分区器。如果没有定义,那么默认使用哈希分区器;