Hadoop DistributedCache分布式缓存的使用

jopen 10年前

做项目的时候遇到一个问题,在Mapper和Reducer方法中处理目标数据时,先要去检索和匹配一个已存在的标签库,再对所处理的字段打标签。因为标签库不是很大,没必要用HBase。我的实现方法是把标签库存储成HDFS上的文件,用分布式缓存存储,这样让每个slave都能读取到这个文件。

main方法中的配置:

//分布式缓存要存储的文件路径  String cachePath[] = {                  "hdfs://10.105.32.57:8020/user/ad-data/tag/tag-set.csv",                  "hdfs://10.105.32.57:8020/user/ad-data/tag/TagedUrl.csv"          };  //向分布式缓存中添加文件          job.addCacheFile(new Path(cachePath[0]).toUri());          job.addCacheFile(new Path(cachePath[1]).toUri());

参考上面代码即可向分布式缓存中添加文件。

在Mapper和Reducer方法中读取分布式缓存文件:

/*   * 重写Mapper的setup方法,获取分布式缓存中的文件   */      @Override      protected void setup(Mapper<LongWritable, Text, Text, Text>.Context context)                     throws IOException, InterruptedException {          // TODO Auto-generated method stub          super.setup(context);          URI[] cacheFile = context.getCacheFiles();          Path tagSetPath = new Path(cacheFile[0]);          Path tagedUrlPath = new Path(cacheFile[1]);          文件操作(如把内容读到set或map中);      }    @Override  public void map(LongWritable key, Text value, Context context)              throws IOException, InterruptedException {              在map()中使用读取出的数据;        }

同样,如果在Reducer中也要读取分布式缓存文件,示例如下:

/*   * 重写Reducer的setup方法,获取分布式缓存中的文件   */      @Override      protected void setup(Context context)                      throws IOException, InterruptedException {          super.setup(context);          mos = new MultipleOutputs<Text, Text>(context);            URI[] cacheFile = context.getCacheFiles();          Path tagSetPath = new Path(cacheFile[0]);          Path tagSetPath = new Path(cacheFile[1]);          文件读取操作;      }     @Override    public void reduce(Text key, Iterable<Text> values, Context context)                throws IOException, InterruptedException {        while(values.iterator().hasNext()){            使用读取出的数据;        }         context.write(key, new Text(sb.toString()));        }
来源:Liu Yan的博客