MapReduce程序reduce输出控制

openkk 13年前

1,在hadoop中,reduce支持多个输出,输出的文件名也是可控的,就是继承MultipleTextOutputFormat类,重写generateFileNameForKey方法

public class LzoHandleLogMr extends Configured implements Tool {      static class LzoHandleLogMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> {                  public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter)      throws IOException {        try {            String[] sp = value.toString().split(",");         output.collect(new Text(sp[0]), value);        }catch (Exception e) {        e.printStackTrace();        }         }       }   static class LzoHandleLogReducer  extends MapReduceBase implements Reducer<Text, Text, Text, NullWritable> {                  @Override    public void reduce(Text key, Iterator<Text> values,      OutputCollector<Text, NullWritable> output, Reporter reporter)      throws IOException {     while (values.hasNext()) {           output.collect(values.next(), NullWritable.get());              }         }    }      public static class LogNameMultipleTextOutputFormat extends MultipleTextOutputFormat<Text, NullWritable>       {        @Override    protected String generateFileNameForKeyValue(Text key,      NullWritable value, String name) {     String sp[] = key.toString().split(",");     String filename = sp[0];     if(sp[0].contains(".")) filename="000000000000";     return filename;    }       }             @Override   public int run(String[] args) throws Exception {             JobConf jobconf = new JobConf(LzoHandleLogMr.class);        jobconf.setMapperClass(LzoHandleLogMapper.class);        jobconf.setReducerClass(LzoHandleLogReducer.class);        jobconf.setOutputFormat(LogNameMultipleTextOutputFormat.class);        jobconf.setOutputKeyClass(Text.class);        jobconf.setNumReduceTasks(12);                     FileInputFormat.setInputPaths(jobconf,new Path(args[0]));        FileOutputFormat.setOutputPath(jobconf,new Path(args[1]));        FileOutputFormat.setCompressOutput(jobconf, true);        FileOutputFormat.setOutputCompressorClass(jobconf, LzopCodec.class);                  JobClient.runJob(jobconf);         return 0;        }  }
</span>

在新版本的hadoopAPI是通过Job类来设置各种参数的,但是我调用 Job.setOutputFormatClass()来使用MultipleTextOutputFormat的时候,竟然报错,原因是必须继承子org.apache.hadoop.mapreduce.OutputFormat。0.20.2比较致命的其中一个bug, 升级到0.21能解决

 

2, 如果同一行数据,需要同时输出至多个文件的话,我们可以使用MultipleOutputs类:

    public class MultiFile extends Confi gured implements Tool {            public static class MapClass extends MapReduceBase                implements Mapper<LongWritable, Text, NullWritable, Text> {                    private MultipleOutputs mos;                            private OutputCollector<NullWritable, Text> collector;                    public void confi gure(JobConf conf) {                        mos = new MultipleOutputs(conf);                    }                            public void map(LongWritable key, Text value,                            OutputCollector<NullWritable, Text> output,                            Reporter reporter) throws IOException {                        String[] arr = value.toString().split(",", -1);                        String chrono = arr[0] + "," + arr[1] + "," + arr[2];                        String geo = arr[0] + "," + arr[4] + "," + arr[5];                        collector = mos.getCollector("chrono", reporter);                        collector.collect(NullWritable.get(), new Text(chrono));                        collector = mos.getCollector("geo", reporter);                        collector.collect(NullWritable.get(), new Text(geo));                    }                            public void close() throws IOException {                        mos.close();                    }            }                    public int run(String[] args) throws Exception {                Confi guration conf = getConf();                JobConf job = new JobConf(conf, MultiFile.class);                Path in = new Path(args[0]);                Path out = new Path(args[1]);                FileInputFormat.setInputPaths(job, in);                FileOutputFormat.setOutputPath(job, out);                job.setJobName("MultiFile");                job.setMapperClass(MapClass.class);                job.setInputFormat(TextInputFormat.class);                job.setOutputKeyClass(NullWritable.class);                job.setOutputValueClass(Text.class);                job.setNumReduceTasks(0);                MultipleOutputs.addNamedOutput(job,                        "chrono",                        TextOutputFormat.class,                        NullWritable.class,                        Text.class);                MultipleOutputs.addNamedOutput(job,                        "geo",                        TextOutputFormat.class,                        NullWritable.class,                        Text.class);                JobClient.runJob(job);                return 0;            }        }  

这个类维护了一个<name, OutputCollector>的map。我们可以在job配置里添加collector,然后在reduce方法中,取得对应的collector并调用collector.write即可。

转自:http://blog.csdn.net/liuzhoulong/article/details/7294397