使用mapreduce计算环比的实例

Smith 9年前

来自: http://www.cnblogs.com/sharpxiajun/p/5205496.html

最近做了一个小的mapreduce程序,主要目的是计算环比值最高的前5名,本来打算使用spark计算,可是本人目前spark还只是简单看了下,因此就先改用mapreduce计算了,今天和大家分享下这个例子,也算是对自己写的程序的总结了。

首先解释下环比,例如我们要算本周的环比,那么计算方式就是本周的数据和上周数字的差值除以上周数值就是环比了,如果是月的环比就是本月和上月数据的差值除以上月数字就是本月环比了。不过本mapreduce实例不会直接算出比值,只是简单求出不同时间段数值的差值,最终环比结果由业务系统进行运算了。

下面看看本人构造的测试数据了,测试数据分成两个文件,文件一的内容如下所示:

guanggu,1;90  hongshan,1;80  xinzhou,1;70  wuchang,1;95  hankou,1;85  hanyang,1;75

第二个文件的测试数据如下:

guanggu,2;66  hongshan,2;68  xinzhou,2;88  wuchang,2;59  hankou,2;56  hanyang,2;38

这里每行第一列的字段就是key了,key和value使用逗号分割,1;90是value值,value值包含两个内容,1为时间段标记,90就是数值,大家可以看到同一个key会有两个不同的时间段(使用1和2来标记)。

Mapreduce的运算逻辑如下:首先第一步我们要求出环比数值,第二步就是排序了,做这个算法我曾考虑许久就是想把求环比值和排序两个过程合并,但是最后发现很难做到,只好将整个运算过程拆分成两个不同mapreduce,第一个mapreduce计算环比,第二个进行排序,二者是迭代关系。这里解释下分成两个mapreduce原因吧,主要原因就是最原始数据很难把两个不同时间段的数据按照key合并在一起变成一行数据,因此mapreduce计算时候必须有一个过程就是执行相同key合并操作,因此不得不分成两个步骤完成计算。

接下来就是具体代码了,首先是第一个mapreduce,用来计算环比值的mapreduce了,它的map实现代码如下:

import java.io.IOException;    import org.apache.hadoop.io.Text;  // 使用输入为object,text,输出为Text,Text的数据结构,Object其实是行号,在本计算里意义不大,Text就是每行的内容  public class MrByAreaHBMap extends org.apache.hadoop.mapreduce.Mapper<Object, Text, Text, Text>{            private static String firstSeparator = ",";//每行的key和value值使用逗号分割        @Override      protected void map(Object key, Text value, Context context)              throws IOException, InterruptedException {          /* 本map的逻辑非常简单,就是从行里拆分key和value,对于有些初学者可能疑惑,我们到底如何让相同的key合并在一起了?这个就要看reduce计算了*/                  Text areaKey = new Text();// reduce输入是Text类型          Text areaVal = new Text();// reduce输入是Text类型          String line = value.toString();          if (line != null && !line.equals("")){              String[] arr = line.split(firstSeparator);                areaKey.set(arr[0]);              areaVal.set(arr[1]);                            context.write(areaKey, areaVal);          }        }    }

下面是reduce代码了,具体如下:

import java.io.IOException;    import org.apache.hadoop.io.Text;  import org.apache.hadoop.mapreduce.Reducer;    public class MrByAreaHBReduce extends Reducer<Text, Text, Text, Text>{            private static String firstSeparator = ";";      private static String preFlag = "1";      private static String nextFlag = "2";        /*reduce的输入也是key,value的形式,不过这个输入是会将map里相同的key的值进行合并,合并形式就是一个数组形式,不过reduce方法里是通过迭代器进行数值处理*/      @Override      protected void reduce(Text key, Iterable<Text> values, Context context)              throws IOException, InterruptedException {          int num1 = 0,num2 = 0,hbNum = 0;          for(Text value : values){              String inVal = value.toString();              String[] arr = inVal.split(firstSeparator);              // 下面的逻辑是通过不同时间段标记获取不同时间段数值              if (arr[0].equals(preFlag)){                  num1 = Integer.valueOf(arr[1]);              }              if (arr[0].equals(nextFlag)){                  num2 = Integer.valueOf(arr[1]);              }          }          hbNum = num1 - num2;// 这里计算环比          Text valueText = new Text();          valueText.set(hbNum + "");          Text retKey = new Text();          /* 对reduce的key进行了修改,将原来key和各个时间段的数值合并在一起,这样读取计算结果时候就可以读取到原始计算数据了,这是key,value计算模式可以简陋的无奈之举*/          retKey.set(key.toString() + firstSeparator + num1 + firstSeparator + num2);          context.write(valueText,retKey);      }    }

求环比的mapredue代码介绍结束了,下面是排序的算法,排序算法更加简单,在计算环比的mapreduce输出里我将环比值和原始key进行了互换,然后输出到结果文件里,这个结果文件就是第二个mapreduce的输入了,下面我们就要对这个新key进行排序,mapredcue计算模型里从map到reduce有默认的排序机制,如果map输出的key是字符类型那么排序规则就是按照字典进行排序,如果key是数字,那么就会按照数字由小到大进行排序,下面就是排序mapreduce的具体算法,map代码如下:

import java.io.IOException;    import org.apache.hadoop.io.IntWritable;  import org.apache.hadoop.io.LongWritable;  import org.apache.hadoop.io.Text;  import org.apache.hadoop.mapreduce.Mapper;    public class MrByAreaSortMap extends          Mapper<LongWritable, Text, IntWritable, Text> {      /* 我们需要的排序是按照key的数值排序,不过这个排序是map的输出才做的,因此代码里输出的key使用了IntWritable类型          其实排序的map逻辑非常简单就是保证map的输出key是数字类型即可      */      @Override      protected void map(LongWritable key, Text value, Context context)              throws IOException, InterruptedException {          String line = value.toString();          /*reduce的输出结果文件格式是按照空格分隔的,不过也搞不清有几个空格,或者是tab分割了,这里使用正则表达式s+就不怕多少空格和tab了*/          String[] arr = line.split("\\s+");          IntWritable outputKey = new IntWritable(Integer.valueOf(arr[0]));          Text outputValue = new Text();          outputValue.set(arr[1]);          context.write(outputKey, outputValue);      }  }

reduce代码如下:

import java.io.IOException;    import org.apache.hadoop.io.IntWritable;  import org.apache.hadoop.io.Text;  import org.apache.hadoop.mapreduce.Reducer;    /* reduce代码很让人吃惊吧,就是把map结果原样输出即可 */  public class MrByAreaSortReduce extends          Reducer<IntWritable, Text, IntWritable, Text> {        @Override      protected void reduce(IntWritable key, Iterable<Text> values,              Context context) throws IOException, InterruptedException {          for (Text text : values){              context.write(key, text);          }      }    }

代码里的注释对代码逻辑进行了详细的解释,这里就不累述了。

下面就是调用两个mapreduce的main函数了,也就是我们该如何执行mapreduce的方式,这个main函数还是非常有特点的,特点一就是两个mapreduce有迭代关系,具体就是第一个mapredcue执行完毕后第二个mapredcue才能执行,或者说第一个mapredcue的输出就是第二个mapredcue的输入,特点二就是排序计算里我们使用了map到reduce过程及shuffle过程里的默认排序机制,那么该机制运用可不是像mapreduce代码那么简单了,其实背后需要我们更加深入理解mapreduce的原理,这里我们直接看代码了,代码如下:

mport java.io.IOException;    import org.apache.hadoop.conf.Configuration;  import org.apache.hadoop.fs.Path;  import org.apache.hadoop.io.IntWritable;  import org.apache.hadoop.io.Text;  import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  import org.apache.hadoop.mapreduce.Job;  import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;  import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;    public class MrByAreaJob {      public static void main(String[] args) throws IOException {          // 一个mapreduce就是一个job 一个job需要一个单独的Configuration,我开始让两个job公用Configuration,最后mr报错          Configuration conf01 = new Configuration();          ControlledJob conJobHB = new ControlledJob(conf01);                    // 下面代码很多文章里都会提到这里就不多说了          Job jobHB = new Job(conf01,"hb");          jobHB.setJarByClass(MrByAreaJob.class);          jobHB.setMapperClass(MrByAreaHBMap.class);          jobHB.setReducerClass(MrByAreaHBReduce.class);          jobHB.setMapOutputKeyClass(Text.class);          jobHB.setMapOutputValueClass(Text.class);          jobHB.setOutputKeyClass(Text.class);          jobHB.setOutputValueClass(Text.class);                    conJobHB.setJob(jobHB);                    FileInputFormat.addInputPath(jobHB, new Path(args[0]));          FileOutputFormat.setOutputPath(jobHB, new Path(args[1]));                    Configuration conf02 = new Configuration();          Job jobSort = new Job(conf02,"sort");          jobSort.setJarByClass(MrByAreaJob.class);          jobSort.setMapperClass(MrByAreaSortMap.class);          jobSort.setReducerClass(MrByAreaSortReduce.class);          // Partitioner是shuffle的一个步骤,一个Partitioner对应一个reduce          // 假如这个mapredue有多个reduce,我们如何保证排序的全局一致性,因此这里需要进行处理          jobSort.setPartitionerClass(PartitionByArea.class);          // map对数值排序默认是由小到大,但是需求是由大到小,因此需要我们改变这种排序          jobSort.setSortComparatorClass(IntKeyComparator.class);          jobSort.setMapOutputKeyClass(IntWritable.class);          jobSort.setMapOutputValueClass(Text.class);                    jobSort.setOutputKeyClass(IntWritable.class);          jobSort.setOutputValueClass(Text.class);                    ControlledJob conJobSort = new ControlledJob(conf02);          conJobSort.setJob(jobSort);                    // 这里添加job的依赖关系          conJobSort.addDependingJob(conJobHB);                    // 可以看到第一个mapreduce的输出就是第二个的输入          FileInputFormat.addInputPath(jobSort, new Path(args[1]));          FileOutputFormat.setOutputPath(jobSort, new Path(args[2]));                    // 主控制job          JobControl mainJobControl = new JobControl("mainHBSort");                    mainJobControl.addJob(conJobHB);          mainJobControl.addJob(conJobSort);                    Thread t = new Thread(mainJobControl);          t.start();                    while(true){              if (mainJobControl.allFinished()){                  System.out.println(mainJobControl.getSuccessfulJobList());                  mainJobControl.stop();                  break;              }          }      }  }

这里有两个类还没有介绍,一个是IntKeyComparator,这是为了保证排序的mapreduce结果是按数字由大到小排序,代码如下:

import org.apache.hadoop.io.IntWritable;  import org.apache.hadoop.io.WritableComparable;  import org.apache.hadoop.io.WritableComparator;    public class IntKeyComparator extends WritableComparator {        protected IntKeyComparator() {          super(IntWritable.class,true);      }        @Override      public int compare(WritableComparable a, WritableComparable b) {          return -super.compare(a, b);      }                }

另一个类就是PartitionByArea,这个是保证排序不会因为reduce设置的个数而不能保证排序的全局一致性,代码具体如下:

import org.apache.hadoop.io.IntWritable;  import org.apache.hadoop.io.Text;  import org.apache.hadoop.mapreduce.Partitioner;    public class PartitionByArea<IntWritable, Text> extends Partitioner<IntWritable, Text> {        @Override      public int getPartition(IntWritable key, Text value, int numReduceTasks) {          int maxValue = 50;          int keySection = 0;                    // numReduceTasks就是默认的reduce任务个数          if (numReduceTasks > 1 && key.hashCode() < maxValue){              int sectionValue = maxValue / (numReduceTasks - 1);              int count = 0;              while((key.hashCode() - sectionValue * count) > sectionValue){                  count++;              }              keySection = numReduceTasks - 1 - count;          }                    return keySection;      }    }

这里特别要讲解的是PartitionByArea,这个原理我花了好一段时间才理解过来,partition是map输出为reduce对应做的分区,一般一个partition对应一个reduce,如果我们将reduce任务槽设置为一个那么就不用更改Partition类,但是实际生产情况下reduce往往会配置多个,这个时候保证数据的整体排序就十分重要了,那么我们如何保证其数据的整体有序了,这个时候我们要找到输入数据的最大值,然后让最大值除以partition的数量的商值作为分割数据的边界,这样等分就可以保证数据的整体排序的有效性了。

现在所有的代码都介绍完毕了,下面就是我们该如何让这个代码运行了,我在写本代码时候使用的是ide是eclipse,不过我没有使用mapreduce插件,而是直接放在服务器上运行,下面我来描述下运行该mr的方式,具体如下:

首先我在装有hadoop服务的服务器上使用root用户创建一个属于我自己的文件夹,这里文件夹的名字叫做xiajun,我通过ftp将源文件传递到xiajun目录下的javafile文件夹,执行如下命令:

mkdir /xiajun/javafile  javac –classpath /home/hadoop/hadoop/hadoop-core-0.20.2-cdh3u4.jar –d /xiajun/javaclass /xiajun/ javafile/*.java

以上命令是编译源文件,将javafile文件夹的java代码编译到javaclass目录下。

Jar –cvf /xiajun/mymr.jar –C /xiajun/javaclass/ .

这里将javaclass目录下class文件打成jar包,放在xiajun目录下。

接下来我们使用hadoop用户登录:

su – hadoop

之所以使用root用户编译,打jar包原因是我的hadoop用户没有权限上传文件不得已而为之了。

我们首先将测试数据上传到HDFS上,接下来执行如下命令:

cd /hadoop/bin

切换目录到bin目录下,然后执行:

hadoop jar mymr.jar cn.com.TestMain  输入目录  输出目录

这里输入可以是具体文件也可以是目录,输出目录在HDFS上要不存在,如果存在hadoop会无法确认任务是否已经执行完毕,就会强制终止任务。

两个mapreduce迭代执行日志非常让人失望,因此如果我们发现任务无法正常执行,我现在都是一个个mapredcue执行查看错误日志。

最后我们看看应用服务应该如何调用这个mapreduce程序,这里我使用远程调用shell 的方式,代码如下:

import java.io.BufferedReader;  import java.io.IOException;  import java.io.InputStream;  import java.io.InputStreamReader;    import ch.ethz.ssh2.Connection;  import ch.ethz.ssh2.Session;      public class TestMain {        /**       * @param args       * @throws IOException        * @throws ClassNotFoundException        * @throws InterruptedException        */      public static void main(String[] args) {          String hostname = "192.168.1.200";          String username = "hadoop";          String pwd = "hadoop";                    Connection conn = new Connection(hostname);          Session sess = null;          long begin = System.currentTimeMillis();          try {              conn.connect();              boolean isAuthenticated = conn.authenticateWithPassword(username, pwd);              sess = conn.openSession();              sess.execCommand("cd hadoop/bin && hadoop jar /xiajun/mymr.jar com.test.mr.MrByAreaJob /xiajun/areaHBinput /xiajun/areaHBoutput58 /xiajun/areaHBoutput68");                            InputStream stdout = sess.getStdout();              BufferedReader br = new BufferedReader(new InputStreamReader(stdout));              StringBuilder sb = new StringBuilder();                            while(true){                  String line = br.readLine();                  if (line == null) break;                  sb.append(line);              }                            System.out.println(sb.toString());                            long end = System.currentTimeMillis();              System.out.println("耗时:" + (begin - end)/1000 + "秒");          } catch (IOException e) {              e.printStackTrace();          }finally{              sess.close();              conn.close();          }      }    }

好了,本文就此结束了。