MapReduce实现join操作的两种方式

是否需要shuffle过程?

在前一篇博客Hive如何让MapReduce实现SQL操作内,说到了Hive这种对join实现方式,即数据通过map加载过来,然后经过shuffle阶段,在reduce中完成真正的join操作。我们称其为reduce join。

我们知道,shuffle是整个大数据处理过程中非常耗时、非常损耗性能的地方,但是有些数据量比较小的情况,是否真的有必要全部进行shuffle呢?答案是肯定的,能规避shuffle的地方就不要使用shuffle。我们可以将小表的数据加载进来放到HDFS分布式缓存里,那么在进行大表数据扫描的时候,每读取一条记录,就和缓存里面的数据进行对比,即join on的参数是不是能对应的上。我们称这种方式为map join。

有两张表如下:
员工表:emp.txt

部门表:dept.txt

SQL命令是

1
SELECT d.dname, e.ename, e.sal FROM dept d JOIN emp e ON (d.deptno = e.deptno);

Reduce join的实现

在map阶段,map函数同时读取两个文件File1和File2,为了区分两种来源的key/value数据对,对每条数据打一个标签> (tag),比如:tag=0表示来自文件File1,tag=2表示来自文件File2。即:map阶段的主要任务是对不同文件中的数据打标签。> 在reduce阶段,reduce函数获取key相同的来自File1和File2文件的value list,
然后对于同一个key,对File1和File2中的数据进行join(笛卡尔乘积)。即:reduce阶段进行实际的连接操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package me.jiangs.bigdata.hadoop.mr.join;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
* Author: Michael PK
*/
public class ReduceJoinApp {

public static void main(String[] args) throws Exception {

Configuration configuration = new Configuration();

Job job = Job.getInstance(configuration);
job.setJarByClass(ReduceJoinApp.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(DataInfo.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);


MultipleInputs.addInputPath(job, new Path("input/join/input/emp.txt"), TextInputFormat.class);
MultipleInputs.addInputPath(job, new Path("input/join/input/dept.txt"), TextInputFormat.class);


Path outputDir = new Path("input/join/output");
outputDir.getFileSystem(configuration).delete(outputDir,true);
FileOutputFormat.setOutputPath(job, outputDir);

job.waitForCompletion(true);


}

public static class MyMapper extends Mapper<LongWritable, Text, IntWritable, DataInfo> {


@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

String[] splits = value.toString().split("\t");
int length = splits.length;

StringBuilder builder = new StringBuilder();

if (length == 3) { // dept
int deptno = Integer.parseInt(splits[0]);
String dname = splits[1];

DataInfo dataInfo = new DataInfo();
dataInfo.setData(dname);
dataInfo.setFlag("d");

context.write(new IntWritable(deptno), dataInfo);
} else if (length == 7){ //emp
String ename = splits[1];
String sal = splits[5];
int deptno = Integer.parseInt(splits[7]);

DataInfo dataInfo = new DataInfo();
dataInfo.setFlag("e");
builder.append(ename).append("\t")
.append(sal).append("\t");
dataInfo.setData(builder.toString());

context.write(new IntWritable(deptno), dataInfo);
}
}
}

public static class MyReducer extends Reducer<IntWritable, DataInfo,Text,NullWritable> {
@Override
protected void reduce(IntWritable key, Iterable<DataInfo> values, Context context) throws IOException, InterruptedException {

List<String> emps = new ArrayList<>();
List<String> depts = new ArrayList<>();

for(DataInfo dataInfo: values) {
if("e".equals(dataInfo.getFlag())) { //emp
emps.add(dataInfo.getData());
} else if("d".equals(dataInfo.getFlag())) { //dept
depts.add(dataInfo.getData());
}
}

//遍历两个List
int i,j;

for(i=0; i<emps.size(); i++) {
for(j=0; j<depts.size();j++) {
context.write(new Text(emps.get(i) + "\t" + depts.get(j)), NullWritable.get());
}
}
}
}
}

Map join的实现

之所以存在reduce join,是因为在map阶段不能获取所有需要的join字段,即:同一个key对应的字段可能位于不同map中。Reduce join是非常低效的,因为shuffle阶段要进行大量的数据传输。 Map join是针对以下场景进行的优化:两个待连接表中,有一个表非常大,而另一个表非常小,以至于小表可以直接存放到内存中。这样,我们可以将小表复制多 份,让每个map task内存中存在一份(比如存放到hash table中),然后只扫描大表:对于大表中的每一条记录key/value,在hash table中查找是否有相同的key的记录,如果有,则连接后输出即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package me.jiangs.bigdata.hadoop.mr.join;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;

/**
* Author: Michael PK
*/
public class MapJoinApp {

public static void main(String[] args)throws Exception {

Configuration configuration = new Configuration();

Job job = Job.getInstance(configuration);
job.setJarByClass(MapJoinApp.class);
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
job.setNumReduceTasks(0); //设置没有reduce

// 把小文件加到分布式缓存
job.addCacheFile(new URI("/Users/rocky/IdeaProjects/hadoop-train-v2/input/join/input/dept.txt"));
FileInputFormat.setInputPaths(job, new Path("input/join/input/emp.txt"));

Path outputDir = new Path("input/join/mapoutput");
outputDir.getFileSystem(configuration).delete(outputDir,true);
FileOutputFormat.setOutputPath(job, outputDir);

job.waitForCompletion(true);
}


public static class MyMapper extends Mapper<LongWritable,Text, Text, NullWritable> {

private static Map<Integer,String> cache = new HashMap<>();

@Override
protected void setup(Context context) throws IOException, InterruptedException {
String path = context.getCacheFiles()[0].toString();
BufferedReader reader = new BufferedReader(new FileReader(path));

String readLine = null;

while((readLine = reader.readLine()) != null) {
String[] splits = readLine.split("\t"); // dept
int deptno = Integer.parseInt(splits[0]);
String dname = splits[1];

cache.put(deptno, dname);
}

}

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

String[] splits = value.toString().split("\t");
int length = splits.length;

StringBuilder builder = new StringBuilder();

if (length == 7) { //emp
String ename = splits[1];
String sal = splits[5];
int deptno = Integer.parseInt(splits[7]);

String dname = cache.get(deptno);

builder.append(ename).append("\t")
.append(sal).append("\t")
.append(dname);

context.write(new Text(builder.toString()), NullWritable.get());
}
}
}
}

0%