大数据编程模型——MapReduce

MapReduce是一种非常简单又非常强大的编程模型。

简单在于其编程模型只包含Map和Reduce两个过程,map的主要输入是一对值,经过map计算后输出一对值;然后将相同Key合并,形成;再将这个输入reduce,经过计算输出零个或多个对。

同时,MapReduce又是非常强大的,不管是代数运算(SQL计算),还是矩阵运算(图计算),大数据领域几乎所有的计算需求都可以通过MapReduce编程实现。

以WordCount程序为例,看下MapReduce计算过程。

WordCount 主要解决的是文本处理中词频统计的问题,就是统计文本中每一个单词出现的次数。如果只是统计一篇文章的词频,及时KB到几MB的数据,只需写一个程序,将数据读入内存,建一个Hash表记录每个词出现的次数就可以了。这个统计过程可以看下面这张图。

如果用Python语言,单机处理WordCount的代码是这样的:

1
2
3
4
5
6
7
8
9
# 文本前期处理
str_list = str.replace('\n', '').lower().split(' ')
count_dict = {}
# 如果字典里有该单词则加1,否则添加入字典
for str in str_list:
if str in count_dict.keys():
count_dict[str] = count_dict[str] + 1
else:
count_dict[str] = 1

简单来说,就是建一个Hash表,然后将字符串里的每个词放到这个Hash表里。如果这个词第一次放到Hash表,就新建一个Key、Value对,Key是这个词,Value是1。如果Hash表里已经有这个词了,那么就给这个词的Value+1。

小数据量用单机统计词频很简单,但是如果想统计全世界互联网所有网页(数万亿计)的词频数,而这正是Google这样的搜索引擎的典型需求,不可能写一个程序把全世界的网页都读入内存,这时候就需要用MapReduce编程来解决。

WordCount的MapReduce程序如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class WordCount {
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
/* Text, IntWritable为Hadoop提供可优化网络序列化传输的基本类型
* Text相当于Java中的String类型
* IntWritable相当于Java中的Integer类型
*/
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();

public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}

public static class IntSumReducer extends Reducer<Text, IntWritable, Text,, IntWritable> {

private IntWritable result = new IntWritable();

public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
}

从这段代码中看到,MapReduce版本WordCount程序的核心是一个map函数和一个reduce函数。

map函数的输入主要是一个对,在这个例子里,Value是要统计的素有文本行中的一行数据,Key是文件中的行偏移量,在一般计算中都不会用到。

1
public void map(Object key, Text value, Context context)

map函数的计算过程是,将这行文本中的单词提取出来,针对每个单词输出一个这样的对。

MapReduce计算框架会将这些收集起来,将相同的word放在一起,形成>这样的数据,然后将其输入给reduce函数。

1
public void reduce(Text key, Iterable<IntWritable> values, Context context)

这里reduce的输入参数Values就是由很多个1组成的集合,而Key就是具体的单词word。

reduce函数的计算过程是,将这个集合里的1求和,再将单词(word)和这个和(sum)组成一个,也就是输出。每一个输出就是一个单词和它的词频统计总和。

一个map函数可以针对一部分数据进行运算,这样就可以将一个大数据切分成很多块(这也正是HDFS所做的),MapReduce计算框架为每个数据块分配一个map函数去计算,从而实现大数据的分布式计算。

0%