0%

MapReduce 编程和源代码学习(四)

在之前的数篇笔记中我们从零开始完成了 Hadoop 分布式集群的搭建。从这一篇笔记开始,我们要通过 MapReduce 计算框架的学习和实践,将集群转化为生产力。

本篇是系列的第四篇笔记。本篇笔记中我们暂时离开源代码,通过实例来加深对 MR 框架的理解和应用能力。

寻找每个月温度最高的两天

原始数据是从数据库中调取的气象站温度监测信息,形式如下:

1
2
3
4
5
6
7
8
9
10
11
1949-10-01 14:21:02	34c
1949-10-01 19:21:02 38c
1949-10-02 14:01:02 36c
1950-01-01 11:21:02 32c
1950-10-01 12:21:02 37c
1951-12-01 12:21:02 23c
1950-10-02 12:21:02 41c
1950-10-03 12:21:02 27c
1951-07-01 12:21:02 45c
1951-07-02 12:21:02 46c
1951-07-03 12:21:03 47c

即,数据共有三列,分别是日期、时刻以及温度。现在我们要找到每个月最热的两天以及对应的温度。


问题分析

Map Task 与 Reduce Task

首先我们要搞清楚在 Map 作业和 Reduce 作业中分别要完成什么任务。比较原始数据和目标可以看出,我们并不需要时刻数据作为结果,因此在 Map 作业中时刻信息会被过滤掉。接着我们需要将相同的月份归到一组交给 Reducer。考虑到 Reduce 作业的复杂度,我们最好在 Map 作业中按照温度的高低对相同月份的数据逆序排序。于是我们要在 Map 作业中进行两组排序:日期和温度。在 Reduce 作业中,我们得到了排过序的温度信息。但是我们并不能简单地提取温度最高的两条数据作为结果,因为这些温度可能出现在同一天。所以在 Reduce 作业中还对将相同日期的数据进行过滤。


代码

MyMapper

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package org.yifan.highestTemperatures;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.util.StringUtils;

import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;

public class MyMapper extends Mapper<LongWritable, Text, Temperature, IntWritable> {

Temperature outKey = new Temperature();
IntWritable outValue = new IntWritable();

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

String[] words = StringUtils.split(value.toString(), ' ');

String pattern = "yyyy-MM-dd";
SimpleDateFormat simpleDateFormat = new SimpleDateFormat(pattern);
try {
Date date = simpleDateFormat.parse(words[0]);
Calendar cal = Calendar.getInstance();
cal.setTime(date);

outKey.setYear(cal.get(Calendar.YEAR));
outKey.setMonth(cal.get(Calendar.MONTH) + 1);
outKey.setDay(cal.get(Calendar.DAY_OF_MONTH));

int temp = Integer.parseInt(words[2].substring(0, words[2].lastIndexOf("c")));
outKey.setTemp(temp);
outValue.set(temp);

context.write(outKey, outValue);

} catch (ParseException e) {
e.printStackTrace();
}

}
}

我们通过 StringUtils 对数据进行了分割,然后通过 SimpleDateFormat 快速获得了年月日数据。我们将封装了年、月、日以及温度信息的 Temperature 类作为 Key,将温度作为 Value 构造了 KV 对。

MyReducer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package org.yifan.highestTemperatures;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class MyReducer extends Reducer<Temperature, IntWritable, Text, IntWritable> {

Text t = new Text();
IntWritable i = new IntWritable();

@Override
protected void reduce(Temperature key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

int flag = 0;
int day = 0;

for(IntWritable val: values) {

if(flag == 0) {
t.set(key.toString());
i.set(val.get());
context.write(t, i);
flag++;
day = key.getDay();
}

if(flag != 0 && day != key.getDay()) {
t.set(key.toString());
i.set(val.get());
context.write(t, i);
return;
}
}
}
}

一个 Group 中是月份相同的数据。我们通过排序器和组排序器(下文)对数据进行了分组和排序,保证月份是正序而相同月份内气温是倒序。于是第一条数据一定是我们想要的两天之一。接下来只要比较日期,找出第二条数据即可。

MyTask

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package org.yifan.highestTemperatures;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class MyTask {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

// 1. 配置 job
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);

job.setJarByClass(MyTask.class);
job.setJobName("weather");

// 2. 设置输入输出路径
Path inPath = new Path(args[0]);
FileInputFormat.addInputPath(job, inPath);

Path outPath = new Path(args[1]);
if(outPath.getFileSystem(conf).exists(outPath)) {
outPath.getFileSystem(conf).delete(outPath);
}
FileOutputFormat.setOutputPath(job, outPath);

// 3. 设置 Mapper
job.setMapperClass(MyMapper.class);
job.setOutputKeyClass(Temperature.class);
job.setOutputValueClass(IntWritable.class);

//4. 定义比较器
job.setSortComparatorClass(MyComparator.class);

// 5. 定义分区器
job.setPartitionerClass(MyPartitioner.class);

// 6. 定义组排序器
job.setGroupingComparatorClass(MyGroupingComparator.class);

// 7. 设置 reduce task 数量
job.setNumReduceTasks(2);

//8. 设置 Reducer
job.setReducerClass(MyReducer.class);

System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

第 5、7 步,分区器和 reduce task 数量可以不写。这里只是做个小实验,看看分区的效果。

Temperature

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package org.yifan.highestTemperatures;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class Temperature implements WritableComparable<Temperature> {

private int year;
private int month;
private int day;
private int temp;

public int getYear() {
return year;
}

public void setYear(int year) {
this.year = year;
}

public int getMonth() {
return month;
}

public void setMonth(int month) {
this.month = month;
}

public int getDay() {
return day;
}

public void setDay(int day) {
this.day = day;
}

public int getTemp() {
return temp;
}

public void setTemp(int temp) {
this.temp = temp;
}

@Override
public String toString() {
return year + "-" + month + "-" + day;
}

@Override
public int compareTo(Temperature o) {
int c1 = Integer.compare(this.getYear(), o.getYear());

if(c1 == 0) {
int c2 = Integer.compare(this.getMonth(), o.getMonth());

if(c2 == 0) {
return Integer.compare(this.getDay(), o.getDay());
}
return c2;
}
return c1;
}

@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeInt(this.getYear());
dataOutput.writeInt(this.getMonth());
dataOutput.writeInt(this.getDay());
dataOutput.writeInt(this.getTemp());
}

@Override
public void readFields(DataInput dataInput) throws IOException {
this.setYear(dataInput.readInt());
this.setMonth(dataInput.readInt());
this.setDay(dataInput.readInt());
this.setTemp(dataInput.readInt());
}
}

MyComparator

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package org.yifan.highestTemperatures;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

/*
* 实现天气和年月正序,温度倒序
*/

public class MyComparator extends WritableComparator {

public MyComparator() {
super(Temperature.class, true);
}

@Override
public int compare(WritableComparable a, WritableComparable b) {

Temperature t1 = null;
Temperature t2 = null;

t1 = (Temperature)a;
t2 = (Temperature)b;

int c1 = Integer.compare(t1.getYear(), t2.getYear());

if(c1 == 0){
int c2 = Integer.compare(t1.getMonth(), t2.getMonth());

if(c2 == 0) {
return -Integer.compare(t1.getTemp(), t2.getTemp());
}
return c2;
}
return c1;
}
}

MyGroupingComparator

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package org.yifan.highestTemperatures;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class MyGroupingComparator extends WritableComparator {
public MyGroupingComparator() {
super(Temperature.class, true);
}

@Override
public int compare(WritableComparable a, WritableComparable b) {

Temperature t1 = null;
Temperature t2 = null;

t1 = (Temperature)a;
t2 = (Temperature)b;

int c1 = Integer.compare(t1.getYear(), t2.getYear());

if(c1 == 0){
return Integer.compare(t1.getMonth(), t2.getMonth());
}
return c1;
}
}

MyPartitioner

1
2
3
4
5
6
7
8
9
10
11
12
package org.yifan.highestTemperatures;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;

public class MyPartitioner extends Partitioner<Temperature, IntWritable> {

@Override
public int getPartition(Temperature temperature, IntWritable intWritable, int i) {
return temperature.getYear() % i;
}
}

推荐好友的好友

原始数据是用户及其好友的列表。每一行的第一个数据是用户名称,后面的是该用户的直接好友。现在我们希望向用户推荐其间接好友。两人共同好友数量越多,推荐优先级越高。

1
2
3
4
5
6
7
world hello hadoop cat
oop hello hive
cat tom hive
mr hive hello
hive cat hadoop world hello mr
hadoop tom hive world
hello tom world hive mr

问题分析

我们从每一行的数据可以知道每个用户的直接好友。同时我们还可以推测出,这些直接好友彼此之间至少都是间接好友。通过后者推测出的间接好友对去除直接好友就能得到全部的间接好友列表。我们首先在 Mapper 中按好友列表输出两两关系,Key 为用户对(注意顺序,按照字典序排序防止重复计算),Value 则负责标记两人是直接还是间接好友。在 Reducer 中我们将数据聚合,得到用户两两之间的共同好友数量。


代码

MyMapper

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package org.yifan.fof;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.util.StringUtils;

import java.io.IOException;

public class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

Text friendPair = new Text();
// 如果是直接好友,值为 0;间接好友,值为 1
IntWritable val = new IntWritable();

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] words = StringUtils.split(value.toString(), ' ');

for (int i = 1; i < words.length; i++) {
friendPair.set(getFriendPair(words[0], words[i]));
val.set(0);
context.write(friendPair, val);

for(int j = i + 1; j < words.length; j++) {
friendPair.set(getFriendPair(words[i], words[j]));
val.set(1);
context.write(friendPair, val);
}
}
}

private String getFriendPair(String x, String y) {
// 将两个名字按照字典序拼接
return x.compareTo(y) < 0 ? x + ":" + y : y + ":" + x;
}
}

我们首先将一行的数据拆开。第一个用户和后面的用户都是直接好友关系;后面的用户两两之间都是间接好友关系。我们分别用 Value 为 0 和 1 进行标记。特别要注意做用户名拼接的时候要将用户名按照字典序进行排序,否则会用重复计算的问题。

MyReducer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package org.yifan.fof;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

int num = 0;

for(IntWritable val : values) {
if(val.get() == 0) {
return;
}

num++;
}

context.write(key, new IntWritable(num));
}
}

Reducer 收到的数据是按照好友对排好序的。我们只要检查一个 Key 中是否有 Value 为 0 的。如果有,说明是直接好友,不需要输出;如果没有,统计一下 1 的个数,就知道有多少共同好友了。

MyTask

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package org.yifan.fof;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class MyTask {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

// 1. 配置 job
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);

job.setJarByClass(MyTask.class);
job.setJobName("fof");

// 2. 配置输入输出路径
Path inPath = new Path(args[0]);
FileInputFormat.addInputPath(job, inPath);

Path outPath = new Path(args[1]);
if(outPath.getFileSystem(conf).exists(outPath)) {
outPath.getFileSystem(conf).delete(outPath, true);
}
FileOutputFormat.setOutputPath(job, outPath);

// 3. 设置 Mapper
job.setMapperClass(MyMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

// 4. 设置 Reducer
job.setReducerClass(MyReducer.class);

System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

PageRank 计算

PageRank 是 Google 提出的算法,用于衡量特定网页对于搜索引擎索引中其它网页的重要程度。PangeRank 的评价方法:

  1. 入链:每有一个页面的超链接指向 A 页面,A 页面的价值就会增加。入链是有权重的,来自高质量网页的入链价值更高。初始状态下每个页面都有 1 票;页面会将自己的票数平均分给各个出链;
  2. 迭代计算:以为入链是有权重的,所以一个页面价值的更新会导致其它页面价值的变化。迭代计算不会无限进行下去。当两次 PR 值计算相差不大时就可以认为计算已经趋于收敛。可用的指标包括差值(例如所有页面和上一次计算 PR 差值小于 0.0001)和百分比(例如 99% 的页面 PR 值和上一次相等);
  3. 阻尼系数:站在业务的角度要考虑一些特殊的网站。如果一个网站只出不入,说明这是一个垃圾网站,PR 为0;如果一个网站只入不出,说明这是一个重要的网站,尽管 PR 值很高,但用户可能会通过收藏或者记住域名等方式进行直接访问。于是在 PR 值计算公式的基础上引入了阻尼系数:$$PR(p_i) = \frac{1-d}{n} + d\sum_{p_j\in M_{(i)}}\frac{PR(p_j)}{L(j)}$$。其中 $$d$$ 为阻尼系数;$$M(i)$$ 为指向页面 $$i$$ 的页面集合;$$L(j)$$ 为页面的出链数;$$PR(p_j)$$ 为页面 $$j$$ 的 PR 值;$$n$$ 为所有页面的数量。阻尼系数取值一般为 0.85。

原始数据形式如下:

1
2
3
4
A B D
B C
C A B
D B C

每一行的第一个元素是投票人,后面是投票的对象。


问题分析

PR 计算是一个迭代的过程,我们先考虑一次计算。我们知道每次迭代都需要将某个页面的 PR 值除以其出链数,然后将得到的价值传递给所链接的页面。于是我们知道,Map 任务应该生成两组 KV 对。第一组的 Key 应该为页面,Value 为页面链接关系;第二组的 Key 应该为所链接的页面,Value 为 PR 值。在 Reduce 任务中我们分别处理两类 Value,最终合并为一组数据,其中 Key 为页面和新的 PR 值,Value 为链接关系。


代码

Node

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package org.yifan.pageRank;


import org.apache.commons.lang.StringUtils;

import java.io.IOException;
import java.util.Arrays;

public class Node {

private double pageRank = 1.0;
private String[] adjacentNodeNames;

public static final char fieldSeparator = '\t';

public double getPageRank() {
return pageRank;
}

public Node setPageRank(double pageRank) {
this.pageRank = pageRank;
return this;
}

public String[] getAdjacentNodeNames() {
return adjacentNodeNames;
}

public void setAdjacentNodeNames(String[] adjacentNodeNames) {
this.adjacentNodeNames = adjacentNodeNames;
}

public boolean containsAdjacentNodes() {
return adjacentNodeNames != null && adjacentNodeNames.length > 0;
}

@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(pageRank);

if(getAdjacentNodeNames() != null) {
sb.append(fieldSeparator).append(StringUtils.join(getAdjacentNodeNames(), fieldSeparator));
}

return sb.toString();
}

public static Node fromMR(String value) throws IOException {
String[] parts = StringUtils.splitPreserveAllTokens(value, fieldSeparator);

if(parts.length < 1) {
throw new IOException(
"Expected one or more parts but received " + parts.length
);
}

Node node = new Node().setPageRank(Double.parseDouble(parts[0]));
if(parts.length > 1) {
node.setAdjacentNodeNames(Arrays.copyOfRange(parts, 1, parts.length));
}

return node;
}
}

自定义的 Node 类型用来储存网页信息。这个类型实际上只是一个中转,因为它并不记录当前页面是什么,只记录它的 PR 值和出链情况。

PageRankMapper

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package org.yifan.pageRank;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class PageRankMapper extends Mapper<Text, Text, Text, Text> {
@Override
protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
// 获取当前是第几次迭代
int iter = context.getConfiguration().getInt("iter", 1);

String page = key.toString();
Node node = null;

if(iter == 1) {
node = Node.fromMR("1.0\t"+value.toString());
}else {
node = Node.fromMR(value.toString());
}

context.write(new Text(page), new Text(node.toString()));

if(node.containsAdjacentNodes()) {
double outValue = node.getPageRank() / node.getAdjacentNodeNames().length;
for(int i = 0; i < node.getAdjacentNodeNames().length; i++) {
String outPage = node.getAdjacentNodeNames()[i];
context.write(new Text(outPage), new Text(outValue+""));
}
}
}
}

PageRankReducer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package org.yifan.pageRank;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class PageRankReducer extends Reducer<Text, Text, Text, Text> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
double sum = 0.0;
Node sourceNode = null;

for(Text i : values) {
Node node = Node.fromMR(i.toString());
if(node.containsAdjacentNodes()) {
sourceNode = node;
}else {
sum += node.getPageRank();
}
}

double newPR = (0.15 / 4) + (0.85 * sum);
System.out.println("*********** new pageRank value is " + newPR);
double d = newPR - sourceNode.getPageRank();

int j = (int)(d * 1000.0);
j = Math.abs(j);
context.getCounter(MyTask.MyCounter.counter).increment(j);

sourceNode.setPageRank(newPR);
context.write(key, new Text(sourceNode.toString()));
}
}

MyTask

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package org.yifan.pageRank;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class MyTask {

// 使用枚举类型追踪各个 Map 任务中的值
public static enum MyCounter {
counter
}

public static void main(String[] args) throws ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();

double d = 0.0000001; // 迭代中止指标,两次 PR 差值
int i = 0;
while(true) {
i++;
try {
conf.setInt("iter", i); // 迭代次数

FileSystem fs = FileSystem.get(conf);
Job job = Job.getInstance(conf);
job.setJobName("PR_" + i);
job.setMapperClass(PageRankMapper.class);
job.setReducerClass(PageRankReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);

// 默认状态下 Mapper 的 InputKeyClass 是 LongWritable,
// 即当前行首字符的下标索引
// 而 KeyValueTextInputFormat 可以将以分隔符分割的一行拆成多个区域,并以第一个区域为 Key,其它为 Value
job.setInputFormatClass(KeyValueTextInputFormat.class);

Path inputPath = new Path(args[0]);
if(i > 1) {
inputPath = new Path(args[1] + "/pr" + (i - 1));
}
FileInputFormat.addInputPath(job, inputPath);

Path outputPath = new Path(args[1] + "/pr" + i);
if(fs.exists(outputPath)) {
fs.delete(outputPath);
}
FileOutputFormat.setOutputPath(job, outputPath);

boolean f = job.waitForCompletion(false);

if(f) {
System.out.println("Success.");

long sum = job.getCounters().findCounter(MyCounter.counter).getValue();
System.out.println(sum);
double avg_d = sum / 4000.0;

if(avg_d < d) {
break;
}
}

} catch (Exception e) {
e.printStackTrace();
}
}
}
}

这里我遇到了一个小小的 bug,那就是 KeyValueTextInputFormat 不能正确拆分数据。结果发现是我在 Intellij 中编辑原始数据时,Intellij 自动将 tab 转换成了四个空格。使用其它编辑器编辑再复制过来就解决了这个问题。


TF-IDF

TF-IDF(Term Frequency - Inverse Document Frequency)是一种用于资讯检索与资源勘探的常用加权技术。它是一种统计方法,用于评估一个字词对于一个文件集或一个语料库中的其中一份文件的重要程度:

  1. 字词的重要性与它在文件中出现的次数成正比;
  2. 字词的重要性与它在语料库中其它文件中出现的频率成反比。

TF-IDF 加权的各种形式常被搜索引擎应用:

  1. 作为文件与用户查询之间相关程度的度量或评级;
  2. 除 TF-IDF 外,搜索引擎还会使用基于链接分析的评级方法,以确定文件在搜索结果中出现的顺序。

TF 指的是给定词语在文件中出现的频率(而非次数,因为这样会偏向长文章),公式为 $$tf_{i, j} = \frac{n_{i, j}}{\sum_kn_{k, j}}$$,其中 $$n_{i, j}$$ 指字词 $$n_i$$ 在文件 $$d_j$$ 中出现的次数。IDF 由文件总数目除以包含该字词的文件数目再取对数得到。它能够排除那些常见但是没有价值的字词,公式为 $$idf_i = log\frac{|D|}{|{j : t_i\in d_j}|}$$,其中 $$|D|$$ 是文件总数目,$$|{j : t_i\in d_j}$$ 是包含 $$t_i$$ 的文件数目。TF-IDF 度量为 TF 和 IDF 两者之积。

原始数据为“ID + ‘\t’ 文本内容” 的形式。


代码

FirstJob

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package org.yifan.tfidf;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class FirstJob {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);

job.setMapperClass(FirstMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

job.setPartitionerClass(FirstPartitioner.class);
job.setCombinerClass(FirstReducer.class);
job.setReducerClass(FirstReducer.class);

job.setNumReduceTasks(4);

Path inPath = new Path(args[0]);
FileInputFormat.addInputPath(job, inPath);

Path outPath = new Path(args[1] + "/firstJob");
FileSystem fs = outPath.getFileSystem(conf);
if(fs.exists(outPath)) {
fs.delete(outPath, true);
}
FileOutputFormat.setOutputPath(job, outPath);
boolean flag = job.waitForCompletion(true);
if(flag) {
System.out.println("First job succeeded!");
}
}
}

FirstMapper

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package org.yifan.tfidf;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.wltea.analyzer.core.IKSegmenter;
import org.wltea.analyzer.core.Lexeme;

import java.io.IOException;
import java.io.StringReader;

public class FirstMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
protected void map(LongWritable key, Text line, Context context) throws IOException, InterruptedException {
String[] words = line.toString().trim().split("\t");
if(words.length >= 2) {
String id = words[0].trim();
String content = words[1].trim();

StringReader sr = new StringReader(content);
IKSegmenter ikSegmenter = new IKSegmenter(sr, true);
Lexeme word;
while((word = ikSegmenter.next()) != null) {
String w = word.getLexemeText();
context.write(new Text(w + "_" + id), new IntWritable(1));
}
context.write(new Text("count"), new IntWritable(1));
}else {
System.out.println(line.toString() + "----------");
}
}
}

FirstReducer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package org.yifan.tfidf;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class FirstReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for(IntWritable i : values) {
sum += i.get();
}
context.write(new Text(key), new IntWritable(sum));
}
}

FirstPartitioner

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package org.yifan.tfidf;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;

public class FirstPartitioner extends HashPartitioner<Text, IntWritable> {
@Override
public int getPartition(Text key, IntWritable value, int numReduceTasks) {
if(key.equals(new Text("count"))) {
return 3;
}else {
return super.getPartition(key, value, numReduceTasks - 1);
}
}
}

SecondJob

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package org.yifan.tfidf;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class SecondJob {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);

job.setMapperClass(SecondMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);

job.setCombinerClass(SecondReducer.class);

Path inPath = new Path(args[1] + "/firstJob");
FileInputFormat.addInputPath(job, inPath);

Path outPath = new Path(args[1] + "/secondJob");
FileSystem fs = outPath.getFileSystem(conf);
if(fs.exists(outPath)) {
fs.delete(outPath, true);
}
FileOutputFormat.setOutputPath(job, outPath);

boolean flag = job.waitForCompletion(true);
if(flag) {
System.out.println("Second job succeeded");
}
}
}

SecondMapper

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package org.yifan.tfidf;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class SecondMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
FileSplit fileSplit = (FileSplit) context.getInputSplit();

if(!fileSplit.getPath().getName().contains("part-r-00003")) {
String[] words = value.toString().trim().split("\t");
if(words.length >= 2) {
String ss = words[0].trim();
String[] split = ss.split("_");
if(split.length >= 2) {
String w = split[0];
context.write(new Text(w), new IntWritable(1));
}
}
}
}
}

SecondReducer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package org.yifan.tfidf;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class SecondReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for(IntWritable i : values) {
sum += i.get();
}
context.write(key, new IntWritable(sum));
}
}

ThirdJob

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package org.yifan.tfidf;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class ThirdJob {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);

job.setMapperClass(ThirdMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);

job.setReducerClass(ThirdReducer.class);
job.addCacheFile(new Path(args[1] + "/firstJob/part-r-00003").toUri());
job.addCacheFile(new Path(args[1] + "/secondJob/part-r-00000").toUri());

Path inPath = new Path(args[1] + "/firstJob");
FileInputFormat.addInputPath(job, inPath);

Path outPath = new Path(args[1] + "thirdJob");
FileSystem fs = outPath.getFileSystem(conf);
if(fs.exists(outPath)) {
fs.delete(outPath, true);
}
FileOutputFormat.setOutputPath(job, outPath);

boolean flag = job.waitForCompletion(true);
if(flag) {
System.out.println("Third job succeeded!");
}
}
}

ThirdMapper

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
package org.yifan.tfidf;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.text.NumberFormat;
import java.util.HashMap;
import java.util.Map;

public class ThirdMapper extends Mapper<LongWritable, Text, Text, Text> {
private Map<String, Integer> df = new HashMap<String, Integer>();
private Map<String, Integer> count = new HashMap<String, Integer>();

// 在 map 函数之前执行,从内存中取出数据
@Override
protected void setup(Context context) throws IOException, InterruptedException {
URI[] cacheFiles = context.getCacheFiles();
if(cacheFiles != null) {
for(int i = 0; i < cacheFiles.length; i++) {
URI uri = cacheFiles[i];
if(uri.getPath().endsWith("part-r-00003")) {
Path path = new Path(uri.getPath());
System.out.println(uri.getPath() + "-" + path.getName());
BufferedReader buffer = new BufferedReader(new FileReader(path.getName()));
String line = buffer.readLine();
if(line.startsWith("count")) {
String[] split = line.split("\t");
count.put(split[0], Integer.valueOf(split[1].trim()));
}
buffer.close();
}else if(uri.getPath().endsWith("part-r-00000")) {
Path path = new Path(uri.getPath());
BufferedReader buffer = new BufferedReader(new FileReader(path.getName()));

String line;
while((line = buffer.readLine()) != null) {
String[] split = line.split("\t");
df.put(split[0], Integer.parseInt(split[1].trim()));
}
buffer.close();
}
}
}
}

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
FileSplit fileSplit = (FileSplit) context.getInputSplit();
if(!fileSplit.getPath().getName().contains("part-r-00003")) {
String[] split = value.toString().trim().split("\t");
if(split.length >= 2) {
int TF = Integer.parseInt(split[1]);

String[] ss = split[0].split("_");
if(ss.length >= 2) {
String word = ss[0];
String id = ss[1];

double TF_IDF = TF * Math.log(count.get("count") / df.get("word"));

NumberFormat f = NumberFormat.getInstance();
f.setMaximumFractionDigits(5);

context.write(new Text(id), new Text(word + "_" + f.format(TF_IDF)));
}
}
}
}
}

ThirdReducer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package org.yifan.tfidf;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class ThirdReducer extends Reducer<Text, Text, Text, Text> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
StringBuffer sb = new StringBuffer();
for(Text text : values) {
sb.append(text.toString()).append("\t");
}
context.write(key, new Text(sb.toString()));
}
}