MapReduce-Hadoop分布式计算模型

nbd2 9年前

MapReduce概述

MapReduce是一种分布式计算模型,由Google提出,主要用于搜索领域,解决海量数据的计算问题。

MR由两个阶段组成:Map和Reduce,用户只需要实现map()和reduce()两个函数,即可实现分布式计算,非常简单。这两个函数的形参是key、value对,表示函数的输入信息。

MapReduce实现原理

 MapReduce-Hadoop分布式计算模型 ddd
执行步骤:
1.map任务处理

1.1 读取输入文件内容,解析成key、value对。对输入文件的每一行,解析成key、value对。每一个键值对调用次map函数。

1.2 写自己的逻辑,对输入的key、value处理,转换成新的key、value输出。

1.3 对输出的key、value进行分区。

1.4 对不同分区的数据,按照key进行排序、分组。相同key的value放到一个集合中。

1.5 (可选)分组后的数据进行归约。

2.reduce任务处理

2.1 对多个map任务的输出,按照不同的分区,通过网络copy到不同的reduce节点。

2.2 对多个map任务的输出进行合并、排序。写reduce函数自己的逻辑,对输入的key、value处理,转换成新的key、value输出。

2.3 把reduce的输出保存到文件中。

序列化

在MapReduce中,序列化是一个很重要的步骤。

序列化就是把结构化的对象转化为字节流。

反序列化就是把字节流转回结构化对象。

hadoop中的Partitioner分区

Hadoop中的MapReduce支持对key进行分区,从而可以使map出来的数据均匀分布在reduce上。
框架自带了一个默认的分区类,HashPartitioner,先看看这个类,就知道怎么自定义key分区了,
public class HashPartitioner<K, V> extends Partitioner<K, V> {       /** Use {@link Object#hashCode()} to partition. */     public int getPartition(K key, V value,                             int numReduceTasks) {       return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;     }     }

先解释一下这个HashPartitioner做的事情,
(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;

将key均匀分布在ReduceTasks上,举例如果Key为Text的话,Text的hashcode方法跟String的基本一致,都是采用 的Horner公式计算,得到一个int,string太大的话这个int值可能会溢出变成负数,所以与上Integer.MAX_VALUE(即 0111111111111111),然后再对reduce个数取余,这样就可以让key均匀分布在reduce上。

实现分区的步骤:

  1. 先分析一下具体的业务逻辑,确定大概有多少个分区
  2. 首先书写一个类,它要继承org.apache.hadoop.mapreduce.Partitioner这个类
  3. 重写public int getPartition这个方法,根据具体逻辑,读数据库或者配置返回相同的数字
  4. 在main方法中设置Partioner的类,job.setPartitionerClass(DataPartitioner.class);
  5. 设置Reducer的数量,job.setNumReduceTasks(6);
举例,
public static class ProviderPartitioner extends Partitioner<Text, DataBean> {      @Override    public int getPartition(Text key, DataBean value, int arg2) {          String account = key.toString();     String sub_acc = account.substring(0,3);     Integer code = 0;     if(sub_acc.equals("aaa")){            code = 1;     }     return code;    }    }

MapReduce中的Combiners编程

每一个map可能会产生大量的输出,combiner的作用就是在map端对输出先做一次合并,以减少传输到reducer的数据量。

combiner最基本是实现本地key的归并,combiner具有类似本地的reduce功能。

如果不用combiner,那么,所有的结果都是reduce完成,效率会相对低下。使用combiner,先完成的map会在本地聚合,提升速度。

注意:Combiner的输出是Reducer的输入,如果Combiner是可插拔的,添加Combiner绝不能改变最终的计算结果。所以 Combiner只应该用于那种Reduce的输入key/value与输出key/value类型完全一致,且不影响最终结果的场景。比如累加,最大值 等。
举例:
public static class Combine extends Reducer<Text,Text,Text,Text> {                      // Reduce Method           public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {               double sum = 0;               int count = 0;               for (Text value : values) {                   String fields[] = value.toString().split(",");                   sum += Double.parseDouble(fields[0]);                   count += Integer.parseInt(fields[1]);               }               context.write(key, new Text(sum+","+count));           }       }

在main方法中设置Combiner的类,

1 
job.setCombinerClass(Combine.class); 

Shuffle-MapReduce的核心

首先让我们看一下下面这张图,
 MapReduce-Hadoop分布式计算模型 ddd
Mapper处理过程:

  1. 一个输入切片对应一个Mapper, 也就是一个Mapper任务读取文件的一部分;
  2. 每一个Mapper都会对应一个环形缓冲区,用来存储Mapper的输出,默认大小100MB(io.sort.mb属性),一旦达到阀值0.8(io.sort.spill.percent),一个后台线程把内容写到(spill)磁盘的指定目录(mapred.local.dir)下的新建的一个溢出写文件;
  3. 在写入磁盘之前要对数据进行分区、排序;
  4. 等最后记录写完,合并全部溢出写文件为一个分区且排序的文件。

Reducer处理过程:

  1. Reducer通过Http方式得到输出文件的分区,每个Reducer会取相对应分区的数据;
  2. Reducer取到数据之后,首先会进行排序,之后合并过的数据会再一此进行排序;
  3. 排序阶段合并map输出,然后走Reduce阶段。

总结

MapReduce是一种编程模型,用于大规模数据集(大于1TB)的并行运算。它极大地方便了编程人员在不会分布式并行编程的情况下,将自己的程序运行在分布式系统上。

转自:http://longliqiang88.github.io/2015/07/12/MapReduce-Hadoop%E5%88%86%E5%B8%83%E5%BC%8F%E8%AE%A1%E7%AE%97%E6%A8%A1%E5%9E%8B/