MapReduce-Hadoop分布式计算模型
MapReduce概述
MapReduce是一种分布式计算模型,由Google提出,主要用于搜索领域,解决海量数据的计算问题。
MR由两个阶段组成:Map和Reduce,用户只需要实现map()和reduce()两个函数,即可实现分布式计算,非常简单。这两个函数的形参是key、value对,表示函数的输入信息。
MapReduce实现原理
1.1 读取输入文件内容,解析成key、value对。对输入文件的每一行,解析成key、value对。每一个键值对调用次map函数。
1.2 写自己的逻辑,对输入的key、value处理,转换成新的key、value输出。
1.3 对输出的key、value进行分区。
1.4 对不同分区的数据,按照key进行排序、分组。相同key的value放到一个集合中。
1.5 (可选)分组后的数据进行归约。
2.reduce任务处理
2.1 对多个map任务的输出,按照不同的分区,通过网络copy到不同的reduce节点。
2.2 对多个map任务的输出进行合并、排序。写reduce函数自己的逻辑,对输入的key、value处理,转换成新的key、value输出。
2.3 把reduce的输出保存到文件中。
序列化
在MapReduce中,序列化是一个很重要的步骤。
序列化就是把结构化的对象转化为字节流。
反序列化就是把字节流转回结构化对象。
hadoop中的Partitioner分区
Hadoop中的MapReduce支持对key进行分区,从而可以使map出来的数据均匀分布在reduce上。框架自带了一个默认的分区类,HashPartitioner,先看看这个类,就知道怎么自定义key分区了,
public class HashPartitioner<K, V> extends Partitioner<K, V> { /** Use {@link Object#hashCode()} to partition. */ public int getPartition(K key, V value, int numReduceTasks) { return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; } }
先解释一下这个HashPartitioner做的事情,
(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
将key均匀分布在ReduceTasks上,举例如果Key为Text的话,Text的hashcode方法跟String的基本一致,都是采用 的Horner公式计算,得到一个int,string太大的话这个int值可能会溢出变成负数,所以与上Integer.MAX_VALUE(即 0111111111111111),然后再对reduce个数取余,这样就可以让key均匀分布在reduce上。
实现分区的步骤:
- 先分析一下具体的业务逻辑,确定大概有多少个分区
- 首先书写一个类,它要继承org.apache.hadoop.mapreduce.Partitioner这个类
- 重写public int getPartition这个方法,根据具体逻辑,读数据库或者配置返回相同的数字
- 在main方法中设置Partioner的类,job.setPartitionerClass(DataPartitioner.class);
- 设置Reducer的数量,job.setNumReduceTasks(6);
public static class ProviderPartitioner extends Partitioner<Text, DataBean> { @Override public int getPartition(Text key, DataBean value, int arg2) { String account = key.toString(); String sub_acc = account.substring(0,3); Integer code = 0; if(sub_acc.equals("aaa")){ code = 1; } return code; } }
MapReduce中的Combiners编程
每一个map可能会产生大量的输出,combiner的作用就是在map端对输出先做一次合并,以减少传输到reducer的数据量。
combiner最基本是实现本地key的归并,combiner具有类似本地的reduce功能。
如果不用combiner,那么,所有的结果都是reduce完成,效率会相对低下。使用combiner,先完成的map会在本地聚合,提升速度。
注意:Combiner的输出是Reducer的输入,如果Combiner是可插拔的,添加Combiner绝不能改变最终的计算结果。所以 Combiner只应该用于那种Reduce的输入key/value与输出key/value类型完全一致,且不影响最终结果的场景。比如累加,最大值 等。举例:
public static class Combine extends Reducer<Text,Text,Text,Text> { // Reduce Method public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { double sum = 0; int count = 0; for (Text value : values) { String fields[] = value.toString().split(","); sum += Double.parseDouble(fields[0]); count += Integer.parseInt(fields[1]); } context.write(key, new Text(sum+","+count)); } }
在main方法中设置Combiner的类,
1 | job.setCombinerClass(Combine.class); |
Shuffle-MapReduce的核心
首先让我们看一下下面这张图,
ddd
Mapper处理过程:
- 一个输入切片对应一个Mapper, 也就是一个Mapper任务读取文件的一部分;
- 每一个Mapper都会对应一个环形缓冲区,用来存储Mapper的输出,默认大小100MB(io.sort.mb属性),一旦达到阀值0.8(io.sort.spill.percent),一个后台线程把内容写到(spill)磁盘的指定目录(mapred.local.dir)下的新建的一个溢出写文件;
- 在写入磁盘之前要对数据进行分区、排序;
- 等最后记录写完,合并全部溢出写文件为一个分区且排序的文件。
Reducer处理过程:
- Reducer通过Http方式得到输出文件的分区,每个Reducer会取相对应分区的数据;
- Reducer取到数据之后,首先会进行排序,之后合并过的数据会再一此进行排序;
- 排序阶段合并map输出,然后走Reduce阶段。
总结
MapReduce是一种编程模型,用于大规模数据集(大于1TB)的并行运算。它极大地方便了编程人员在不会分布式并行编程的情况下,将自己的程序运行在分布式系统上。
转自:http://longliqiang88.github.io/2015/07/12/MapReduce-Hadoop%E5%88%86%E5%B8%83%E5%BC%8F%E8%AE%A1%E7%AE%97%E6%A8%A1%E5%9E%8B/