MapReduce程序reduce输出控制
1,在hadoop中,reduce支持多个输出,输出的文件名也是可控的,就是继承MultipleTextOutputFormat类,重写generateFileNameForKey方法 在新版本的hadoopAPI是通过Job类来设置各种参数的,但是我调用 Job.setOutputFormatClass()来使用MultipleTextOutputFormat的时候,竟然报错,原因是必须继承子org.apache.hadoop.mapreduce.OutputFormat。0.20.2比较致命的其中一个bug, 升级到0.21能解决 2, 如果同一行数据,需要同时输出至多个文件的话,我们可以使用MultipleOutputs类: 这个类维护了一个<name, OutputCollector>的map。我们可以在job配置里添加collector,然后在reduce方法中,取得对应的collector并调用collector.write即可。public class LzoHandleLogMr extends Configured implements Tool { static class LzoHandleLogMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> { public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { try { String[] sp = value.toString().split(","); output.collect(new Text(sp[0]), value); }catch (Exception e) { e.printStackTrace(); } } } static class LzoHandleLogReducer extends MapReduceBase implements Reducer<Text, Text, Text, NullWritable> { @Override public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, NullWritable> output, Reporter reporter) throws IOException { while (values.hasNext()) { output.collect(values.next(), NullWritable.get()); } } } public static class LogNameMultipleTextOutputFormat extends MultipleTextOutputFormat<Text, NullWritable> { @Override protected String generateFileNameForKeyValue(Text key, NullWritable value, String name) { String sp[] = key.toString().split(","); String filename = sp[0]; if(sp[0].contains(".")) filename="000000000000"; return filename; } } @Override public int run(String[] args) throws Exception { JobConf jobconf = new JobConf(LzoHandleLogMr.class); jobconf.setMapperClass(LzoHandleLogMapper.class); jobconf.setReducerClass(LzoHandleLogReducer.class); jobconf.setOutputFormat(LogNameMultipleTextOutputFormat.class); jobconf.setOutputKeyClass(Text.class); jobconf.setNumReduceTasks(12); FileInputFormat.setInputPaths(jobconf,new Path(args[0])); FileOutputFormat.setOutputPath(jobconf,new Path(args[1])); FileOutputFormat.setCompressOutput(jobconf, true); FileOutputFormat.setOutputCompressorClass(jobconf, LzopCodec.class); JobClient.runJob(jobconf); return 0; } }
</span> public class MultiFile extends Confi gured implements Tool { public static class MapClass extends MapReduceBase implements Mapper<LongWritable, Text, NullWritable, Text> { private MultipleOutputs mos; private OutputCollector<NullWritable, Text> collector; public void confi gure(JobConf conf) { mos = new MultipleOutputs(conf); } public void map(LongWritable key, Text value, OutputCollector<NullWritable, Text> output, Reporter reporter) throws IOException { String[] arr = value.toString().split(",", -1); String chrono = arr[0] + "," + arr[1] + "," + arr[2]; String geo = arr[0] + "," + arr[4] + "," + arr[5]; collector = mos.getCollector("chrono", reporter); collector.collect(NullWritable.get(), new Text(chrono)); collector = mos.getCollector("geo", reporter); collector.collect(NullWritable.get(), new Text(geo)); } public void close() throws IOException { mos.close(); } } public int run(String[] args) throws Exception { Confi guration conf = getConf(); JobConf job = new JobConf(conf, MultiFile.class); Path in = new Path(args[0]); Path out = new Path(args[1]); FileInputFormat.setInputPaths(job, in); FileOutputFormat.setOutputPath(job, out); job.setJobName("MultiFile"); job.setMapperClass(MapClass.class); job.setInputFormat(TextInputFormat.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); job.setNumReduceTasks(0); MultipleOutputs.addNamedOutput(job, "chrono", TextOutputFormat.class, NullWritable.class, Text.class); MultipleOutputs.addNamedOutput(job, "geo", TextOutputFormat.class, NullWritable.class, Text.class); JobClient.runJob(job); return 0; } }
转自:http://blog.csdn.net/liuzhoulong/article/details/7294397