hadoop 里执行 MapReduce 任务的几种方式
jopen
12年前
说明:
测试文件:
echo -e "aa\tbb \tcc bb\tcc\tdd" > 3.txt
hadoop fs -mkdir /data hadoop fs -put 3.txt /data
全文的例子均以该文件做测试用例,统计单词出现的次数(WordCount)。
1、最原始的方式:java 源码编译打包成jar包后,由 hadoop 脚本调度执行,类似:
bin/hadoop jar /tmp/wordcount.jar WordCount /tmp/input /tmp/output
java 代码 100 多行,我就不贴代码了,具体请见官方范例:
2、MR 脚本开发语言:pig
A1 = load '/data/3.txt'; A = stream A1 through `sed "s/\t/ /g"`; B = foreach A generate flatten(TOKENIZE((chararray)$0)) as word; C = filter B by word matches '\\w+'; D = group C by word; E = foreach D generate COUNT(C), group; dump E;
注意:不同分隔符对load及后面的$0的影响。
详情请见:
https://gist.github.com/186460
http://www.slideshare.net/erikeldridge/a-brief-handson-introduction-to-hadoop-pig
3、构建数据仓库的类 SQL 开发语言:hive
create table textlines(text string); load data inpath '/data/3.txt' overwrite into table textlines; SELECT wordColumn, count(1) FROM textlines LATERAL VIEW explode(split(text,'\t+')) wordTable AS wordColumn GROUP BY wordColumn;
详情请见:
http://my.oschina.net/leejun2005/blog/83045
http://blog.csdn.net/techdo/article/details/7433222
4、跨平台的脚本语言:python
map:
#!/usr/bin/python import os,re,sys for line in sys.stdin: for i in line.strip().split("\t"): print i
reduce:
#!/usr/bin/python import os,re,sys arr = {} for words in sys.stdin: word = words.strip() if word not in arr: arr[word] = 1 else: arr[word] += 1 for k, v in arr.items(): print str(k) + ": " + str(v)
最后在shell下执行:
hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-streaming-0.20.203.0.jar -file map.py -file reduce.py -mapper map.py -reducer reduce.py -input /data/3.txt -output /data/py
注意:脚本开头需要显示指定何种解释器以及赋予脚本执行权限
详情请见:
http://blog.csdn.net/jiedushi/article/details/7390015
5、Linux 下的瑞士军刀:shell 脚本
map:
#!/bin/bash tr '\t' '\n'
reduce:
#!/bin/bash sort|uniq -c
最后在shell下执行:
june@deepin:~/hadoop/hadoop-0.20.203.0/tmp> hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-streaming-0.20.203.0.jar -file map.py -file reduce.py -mapper map.py -reducer reduce.py -input /data/3.txt -output /data/py packageJobJar: [map.py, reduce.py, /home/june/data_hadoop/tmp/hadoop-unjar2676221286002400849/] [] /tmp/streamjob8722854685251202950.jar tmpDir=null 12/10/14 21:57:00 INFO mapred.FileInputFormat: Total input paths to process : 1 12/10/14 21:57:00 INFO streaming.StreamJob: getLocalDirs(): [/home/june/data_hadoop/tmp/mapred/local] 12/10/14 21:57:00 INFO streaming.StreamJob: Running job: job_201210141552_0041 12/10/14 21:57:00 INFO streaming.StreamJob: To kill this job, run: 12/10/14 21:57:00 INFO streaming.StreamJob: /home/june/hadoop/hadoop-0.20.203.0/bin/../bin/hadoop job -Dmapred.job.tracker=localhost:9001 -kill job_201210141552_0041 12/10/14 21:57:00 INFO streaming.StreamJob: Tracking URL: http://localhost:50030/jobdetails.jsp?jobid=job_201210141552_0041 12/10/14 21:57:01 INFO streaming.StreamJob: map 0% reduce 0% 12/10/14 21:57:13 INFO streaming.StreamJob: map 67% reduce 0% 12/10/14 21:57:19 INFO streaming.StreamJob: map 100% reduce 0% 12/10/14 21:57:22 INFO streaming.StreamJob: map 100% reduce 22% 12/10/14 21:57:31 INFO streaming.StreamJob: map 100% reduce 100% 12/10/14 21:57:37 INFO streaming.StreamJob: Job complete: job_201210141552_0041 12/10/14 21:57:37 INFO streaming.StreamJob: Output: /data/py june@deepin:~/hadoop/hadoop-0.20.203.0/tmp> hadoop fs -cat /data/py/part-00000 1 aa 1 bb 1 bb 2 cc 1 dd june@deepin:~/hadoop/hadoop-0.20.203.0/tmp>
特别提示:上述有些方法对字段后的空格忽略或计算,请注意仔细甄别。
说明:列举了上述几种方法主要是给大家一个不同的思路,
在解决问题的过程中,开发效率、执行效率都是我们需要考虑的,不要太局限某一种方法了。