在之前的数篇笔记中我们从零开始完成了 Hadoop 分布式集群的搭建。从这一篇笔记开始,我们要通过 MapReduce 计算框架的学习和实践,将集群转化为生产力。
本篇是系列的第五篇笔记。本篇笔记中我们将会继续分析学习 Mapper 数据输出的相关源代码,从而加深对相关流程的理解。
Map Task 源代码分析(二)
上上篇笔记中我们跳过了 RecordWriter 相关的内容。这回我们继续从 MapTask.class 开始看起:
1 2 3 4 5 6 7 8 9 10 11
| ...
RecordWriter output = null; if (job.getNumReduceTasks() == 0) { output = new MapTask.NewDirectOutputCollector(taskContext, job, umbilical, reporter); } else { output = new MapTask.NewOutputCollector(taskContext, job, umbilical, reporter); }
...
|
NewOutputCollector
我们这里重点看 NewOutputCollector。打开它:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| ...
private class NewOutputCollector<K, V> extends RecordWriter<K, V> { private final MapOutputCollector<K, V> collector; ...
NewOutputCollector(JobContext jobContext, JobConf job, TaskUmbilicalProtocol umbilical, TaskReporter reporter) throws IOException, ClassNotFoundException { this.collector = MapTask.this.createSortingCollector(job, reporter); ... } } }
...
|
可以看到我们定义了一个成员变量 MapOutputCollector,并且通过createSortingCollector()
方法实例化了它。根据我们对 MR 框架的理解我们有理由猜测,createSortingCollector()
得到的是一个兼具排序功能的、向内存缓冲区中书写数据的收集器。
MapOutputCollector
点开来看一看:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| ...
private <KEY, VALUE> MapOutputCollector<KEY, VALUE> createSortingCollector(JobConf job, TaskReporter reporter) throws IOException, ClassNotFoundException { Context context = new Context(this, job, reporter); Class<?>[] collectorClasses = job.getClasses("mapreduce.job.map.output.collector.class", new Class[]{MapTask.MapOutputBuffer.class}); int remainingCollectors = collectorClasses.length; Class[] arr$ = collectorClasses; int len$ = collectorClasses.length; int i$ = 0;
while(i$ < len$) { Class clazz = arr$[i$];
try { if (!MapOutputCollector.class.isAssignableFrom(clazz)) { throw new IOException("Invalid output collector class: " + clazz.getName() + " (does not implement MapOutputCollector)"); }
Class<? extends MapOutputCollector> subclazz = clazz.asSubclass(MapOutputCollector.class); LOG.debug("Trying map output collector class: " + subclazz.getName()); MapOutputCollector<KEY, VALUE> collector = (MapOutputCollector)ReflectionUtils.newInstance(subclazz, job); collector.init(context); LOG.info("Map output collector class = " + collector.getClass().getName()); return collector; } catch (Exception var12) { String msg = "Unable to initialize MapOutputCollector " + clazz.getName(); --remainingCollectors; if (remainingCollectors > 0) { msg = msg + " (" + remainingCollectors + " more collector(s) to try)"; }
LOG.warn(msg, var12); ++i$; } } throw new IOException("Unable to initialize any output collector"); }
...
|
长长一段,核心只有一个,那就是 MapOutputBuffer 类。如果我们不定义mapreduce.job.map.output.collector.class
的话,MapOutputBuffer 就是默认的数据收集器。从它的名字可以看出它收集的数据被写入到了缓冲区中。自定义 Collector 十分复杂,这里不作详细研究。MapOutputBuffer 通过collector.init(context)
实例化,接下来我们将观察初始化的过程。
MapOutputBuffer 初始化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
| public void init(Context context) throws IOException, ClassNotFoundException { this.job = context.getJobConf(); this.reporter = context.getReporter(); this.mapTask = context.getMapTask(); this.mapOutputFile = this.mapTask.getMapOutputFile(); this.sortPhase = this.mapTask.getSortPhase(); this.spilledRecordsCounter = this.reporter.getCounter(TaskCounter.SPILLED_RECORDS); this.partitions = this.job.getNumReduceTasks(); this.rfs = FileSystem.getLocal(this.job).getRaw(); float spillper = this.job.getFloat("mapreduce.map.sort.spill.percent", 0.8F); int sortmb = this.job.getInt("mapreduce.task.io.sort.mb", 100); this.indexCacheMemoryLimit = this.job.getInt("mapreduce.task.index.cache.limit.bytes", 1048576); if (spillper <= 1.0F && spillper > 0.0F) { if ((sortmb & 2047) != sortmb) { throw new IOException("Invalid \"mapreduce.task.io.sort.mb\": " + sortmb); } else { this.sorter = (IndexedSorter)ReflectionUtils.newInstance(this.job.getClass("map.sort.class", QuickSort.class, IndexedSorter.class), this.job); int maxMemUsage = sortmb << 20; maxMemUsage -= maxMemUsage % 16; this.kvbuffer = new byte[maxMemUsage]; this.bufvoid = this.kvbuffer.length; this.kvmeta = ByteBuffer.wrap(this.kvbuffer).order(ByteOrder.nativeOrder()).asIntBuffer(); this.setEquator(0); this.bufstart = this.bufend = this.bufindex = this.equator; this.kvstart = this.kvend = this.kvindex; this.maxRec = this.kvmeta.capacity() / 4; this.softLimit = (int)((float)this.kvbuffer.length * spillper); this.bufferRemaining = this.softLimit; MapTask.LOG.info("mapreduce.task.io.sort.mb: " + sortmb); MapTask.LOG.info("soft limit at " + this.softLimit); MapTask.LOG.info("bufstart = " + this.bufstart + "; bufvoid = " + this.bufvoid); MapTask.LOG.info("kvstart = " + this.kvstart + "; length = " + this.maxRec); this.comparator = this.job.getOutputKeyComparator(); this.keyClass = this.job.getMapOutputKeyClass(); this.valClass = this.job.getMapOutputValueClass(); this.serializationFactory = new SerializationFactory(this.job); this.keySerializer = this.serializationFactory.getSerializer(this.keyClass); this.keySerializer.open(this.bb); this.valSerializer = this.serializationFactory.getSerializer(this.valClass); this.valSerializer.open(this.bb); this.mapOutputByteCounter = this.reporter.getCounter(TaskCounter.MAP_OUTPUT_BYTES); this.mapOutputRecordCounter = this.reporter.getCounter(TaskCounter.MAP_OUTPUT_RECORDS); this.fileOutputByteCounter = this.reporter.getCounter(TaskCounter.MAP_OUTPUT_MATERIALIZED_BYTES); if (this.job.getCompressMapOutput()) { Class<? extends CompressionCodec> codecClass = this.job.getMapOutputCompressorClass(DefaultCodec.class); this.codec = (CompressionCodec)ReflectionUtils.newInstance(codecClass, this.job); } else { this.codec = null; }
Counter combineInputCounter = this.reporter.getCounter(TaskCounter.COMBINE_INPUT_RECORDS); this.combinerRunner = CombinerRunner.create(this.job, this.getTaskID(), combineInputCounter, this.reporter, (OutputCommitter)null); if (this.combinerRunner != null) { Counter combineOutputCounter = this.reporter.getCounter(TaskCounter.COMBINE_OUTPUT_RECORDS); this.combineCollector = new CombineOutputCollector(combineOutputCounter, this.reporter, this.job); } else { this.combineCollector = null; }
this.spillInProgress = false; this.minSpillsForCombine = this.job.getInt("mapreduce.map.combine.minspills", 3); this.spillThread.setDaemon(true); this.spillThread.setName("SpillThread"); this.spillLock.lock();
try { this.spillThread.start();
while(!this.spillThreadRunning) { this.spillDone.await(); } } catch (InterruptedException var10) { throw new IOException("Spill thread failed to initialize", var10); } finally { this.spillLock.unlock(); }
if (this.sortSpillException != null) { throw new IOException("Spill thread failed to initialize", this.sortSpillException); } } } else { throw new IOException("Invalid \"mapreduce.map.sort.spill.percent\": " + spillper); } }
|
长长的。简单说,缓冲区默认大小为 100 MB,且默认写到 80% 后就开始向磁盘写数据。Sorter 使用 QuickSort 算法。
Partition 分区个数与分区器
回到 NewOutputCollector,得到 collector 后再下面一行我们得到了分区的个数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
|
private class NewOutputCollector<K, V> extends RecordWriter<K, V> { private final MapOutputCollector<K, V> collector; ...
NewOutputCollector(JobContext jobContext, JobConf job, TaskUmbilicalProtocol umbilical, TaskReporter reporter) throws IOException, ClassNotFoundException { this.partitions = jobContext.getNumReduceTasks(); if (this.partitions > 1) { this.partitioner = (Partitioner)ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job); } else { this.partitioner = new Partitioner<K, V>() { public int getPartition(K key, V value, int numPartitions) { return NewOutputCollector.this.partitions - 1; } }; } ... } } }
|
从方法的名称getNumReduceTasks()
可以看出分区的个数就是 Reducer 的个数。如果分区数大于 1,那么我们就可以自行定义 PartitionClass(默认使用的是哈希分区器);而如果分区数为 1,从下面的匿名内部类可以看出,分区号直接被设定为了 0。
小结
- 可以通过继承 Partitioner 类自行定义分区器。如果没有定义,那么默认使用哈希分区器;