大数据计算框架MapReduce的实现原理

MapReduce的原理,简单来说就是,在Map阶段为每个数据块分配一个Map计算任务,然后将所有map输出的Key进行合并,相同的Key及其对应的Value发送给同一个Reduce任务去处理。对于大数据工程师来说,只要遵循MapReduce编程模型就可以开发出复杂的大数据计算程序。

那么问题来了,这个程序是如何在分布式集群中运行起来的?程序又是如何找到相应的数据并进行计算的?这就需要MapReduce计算框架来完成。

首先,在这个过程中有两个关键问题需要处理。

  • 如何为每个数据块分配一个Map计算任务,也就是代码如何发送到数据块所在服务器的,发送后是如何启动的,启动以后如何知道自己需要计算的数据在文件什么位置。

  • 处于不同服务器的map输出的,如何把相同的Key聚合在一起发送给Reduce任务进行处理。

还是举个例子,假设有两个数据块的文本数据需要进行词频统计,MapReduce计算过程如下图所示,其中标红的两处就是对应的上面两个关键问题,需要“MapReduce框架处理”,具体来说,分别就是MapReduce作业启动和运行,以及MapReduce数据合并与连接。

MapReduce作业启动和运行机制

MapReduce运行过程设计三类关键进程。

  • 1.大数据应用进程。
    这类进程是启动MapReduce程序的主入口,主要是指定Map和Reduce类、输入输出文件路径等,并提交作业给Hadoop集群,也就是下面提到的JobTracker进程。这是由用户启动的MapReduce程序进程。

  • 2.JobTracker进程。
    这类进程根据要处理的输入数据量,命令下面提到的TaskTracker进程启动相应数量的Map和Reduce进程任务,并管理整个作业生命周期的任务调度和监控。这是Hadoop集群的常驻进程,要注意的是,JobTracker进程在整个Hadoop集群全局唯一。

  • 3.TaskTracker进程。这个进程负责启动和管理Map进程以及Reduce进程。因为需要每个数据块都有对应的map函数,TaskTracker进程通常和HDFS的DataNode进程启动在同一个服务器。也就是说,Hadoop集群中绝大多数服务器同时运行DataNode进程和TaskTracker进程。

JobTracker进程和TaskTracker进程是主从关系,主服务器通常只有一台(或者另有一台备机提供高可用服务,但运行时只有一台服务器对外提供服务,真正起作用的只有一台),从服务器可能有几百上千台,所有的从服务器听主服务器的控制和调度安排。主服务器负责为应用程序分配服务器资源以及作业执行的调度,而具体的计算操作则在从服务器上完成。

具体来看,MapReduce的主服务器就是JobTracker,从服务器就是TaskTracker。其实HDFS也是主从架构,它的主服务器是NameNode,从服务器是DataNode。类似的,Yarn, Spark等也都是这样的架构,这种一主多从的服务器架构也是绝大多数大数据系统的架构方案。

可重复使用的架构方案叫作架构模式,一主多从可谓是大数据领域最主要的架构模式。主服务器只有一台,掌控全局;从服务器很多台,负责具体的事情。这样很多台服务器就可以有效组织起来,对外表现出一个统一又强大的计算能力。

了解了上面的内容,再看下面这张图,就可以一步一步感受一下整个流程。

真个流程可以概括如下:

  • 1.应用进程JobClient将用户作业JAR包存储在HDFS中,将来这些JAR包会分发给Hadoop集群中的服务器执行MapReduce计算。

  • 2.应用程序提交job作业给JobTracker。

  • 3.JobTracker根据作业调度策略创建JobInProcess树,每个作业都会有一个自己的JobInProcess树。

  • 4.JobInProcess根据输入数据分片数目(通常就是数据块的数目)和设置的Reduce数目创建相应数量的TaskInProcess。

  • 5.TaskTracker进程和JobTracker进程进行定时通信。

  • 6.如果TaskTracker有空闲的计算资源(有空闲CPU核心),JobTracker就会给它分配任务。分配任务的时候回根据TaskTracker的服务器名字匹配在同一台机器上的数据块计算任务给它,使启动计算任务正好处理本机上的数据,以实现“移动计算比移动数据更划算”。

  • 7.TaskTracker收到任务后根据任务类型(是Map还是Reduce)和任务参数(作业JAR路径、输入数据文件路径、要处理的数据在文件中的起始位置和偏移量、数据块多个备份的DataNode主机名等),启动相应的Map或者Reduce进程。

  • 8.Map或者Reduce进程启动后,检查本地是否有要执行任务的JAR包文件,如果没有就去HDFS上下载,然后加载Map或者Reduce代码开始执行。

  • 9.如果是Map进程,从HDFS读取数据(通常要读取的数据块正好存储在本机);如果是Reduce进程,将结果写出到HDFS。

MapReduce数据合并与连接机制

MapReduce真正产生奇迹的地方是数据的合并与连接。

几乎所有的大数据计算场景都要处理数据关联的问题,像WordCount这种比较简单的只要对Key进行合并就可以了,对于像数据库的join操作这种比较复杂的,需要对两种类型(或者更多类型)的数据根据Key进行连接。

在map输出与reduce输入之间,MapReduce计算框架处理数据合并与连接操作,这个操作有个专门的词汇叫shuffle。shuffle的具体过程请看下图:

每个Map任务的计算结果都会写入到本地文件系统,等Map任务快要计算完成的时候,MapReduce计算框架会启动shuffle过程,在Map任务进程调用一个Partitioner接口,对Map产生的每个进行Reduce分区选择,然后通过HTTP通信发送给对应的Reduce进程。这样不管Map位于哪个服务器节点,相同的Key一定会被发送给相同的Reduce进程。Reduce任务进程对收到的进行排序和合并,相同的Key放在一起,组成一个传递给Reduce执行。

map输出的shuffle到哪个Reduce进程是这里的关键,它是有Partitioner来实现,MapReduce框架默认的Partitioner用Key的哈希值对Reduce任务数量取模,相同的Key一定会落在相同的Reduce任务ID上。从实现上来看,这样的Partitioner代码只需要一行。

1
2
3
4
 /** Use {@link Object#hashCode()} to partition. */ 
public int getPartition(K key, V value, int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}

关于shuffle,只要记住一点:分布式计算需要将不同服务器上的相关数据合并到一起进行下一步计算,这就是shuffle

MapReduce编程相对来说是简单的,但是MapReduce框架要将一个相对简单的程序在分布式的大规模服务器集群上并行执行起来却并不简单。

0%