Oozie4.2.0配置安装实战
来自: http://blog.csdn.net/fansy1990/article/details/50570518
在Hadoop中执行的任务有时候需要把多个Map/Reduce作业连接到一起,这样才能够达到目的。[1]在Hadoop生态圈中,有一种相对比较新的组件叫做Oozie[2],它让我们可以把多个Map/Reduce作业组合到一个逻辑工作单元中,从而完成更大型的任务。本文中,我们会向你介绍Oozie以及使用它的一些方式。
什么是Oozie?
Oozie是一种Java Web应用程序,它运行在Java servlet容器——即Tomcat——中,并使用数据库来存储以下内容:
- 工作流定义
- 当前运行的工作流实例,包括实例的状态和变量
Oozie工作流是放置在控制依赖DAG(有向无环图 Direct Acyclic Graph)中的一组动作(例如,Hadoop的Map/Reduce作业、Pig作业等),其中指定了动作执行的顺序。我们会使用hPDL(一种XML流程定义语言)来描述这个图。
软件版本:
Oozie4.2.0,Hadoop2.6.0,Spark1.4.1,Hive0.14,Pig0.15.0,Maven3.2,JDK1.7,zookeeper3.4.6,HBase1.1.2,MySQL5.6
集群部署:
node1~4.centos.com node1~4 192.168.0.31~34 1G*4 内存 1核*4 虚拟机
node1:NameNode 、ResourceManager;
node2:SecondaryNameNode、Master、HMaster、HistoryServer、JobHistoryServer
node3:oozie-server(tomcat)、DataNode、NodeManager、HRegionServer、Worker、QuorumPeerMain
node4:DataNode、NodeManager、HRegionServer、Worker、Pig client、Hive Client、HiveServer2、QuorumPeerMain、mysql
1. 编译Oozie4.2.0
1.1 编译环境准备
2)修改pom.xml
/usr/local/oozie/oozie-4.2.0/distro/pom.xml
<get src="http://archive.apache.org/dist/tomcat/tomcat-6 ==>
<get src="http://archive.apache.org/dist/tomcat/tomcat-7
<mirror>
<id>nexus-osc</id>
<name>OSChina Central</name>
<url>http://maven.oschina.net/content/groups/public/</url>
<mirrorOf>*</mirrorOf>
</mirror>
1.2 编译
bin/mkdistro.sh -DskipTests -Phadoop-2 -Dhadoop.auth.version=2.6.0 -Ddistcp.version=2.6.0 -Dspark.version=1.4.1 -Dpig.version=0.15.0 -Dtomcat.version=7.0.52如果加入了hbase或者hive,并且指定到较高版本,则会出错,如:
#bin/mkdistro.sh -DskipTests -Phadoop-2 -Dhadoop.auth.version=2.6.0 -Ddistcp.version=2.6.0 -Dspark.version=1.4.1 -Dpig.version=0.15.0 -Dtomcat.version=7.0.52 #-Dhive.version=0.14.0 -Dhbase.version=1.1.2 ## 指定hive和hbase到较高版本编译通不过
1.3 修改HDFS配置:
<property> <name>hadoop.proxyuser.[USER].hosts</name> <value>*</value> </property> <property> <name>hadoop.proxyuser.[USER].groups</name> <value>*</value> </property>其中,[USER]需要改为后面启动oozie tomcat的用户
hdfs dfsadmin -refreshSuperUserGroupsConfiguration yarn rmadmin -refreshSuperUserGroupsConfiguration
1.4 配置Oozie
oozie-4.2.0/distro/target/oozie-4.2.0-distro.tar.gz
2) 解压缩:
tar -zxf oozie-4.2.0-distro.tar.gz
ext-2.2.zip 拷贝到该目录下;
并拷贝hadoop相关jar包到该目录下
cp $HADOOP_HOME/share/hadoop/*/*.jar libext/
cp $HADOOP_HOME/share/hadoop/*/lib/*.jar libext/
把hadoop与tomcat冲突jar包去掉
mv servlet-api-2.5.jar servlet-api-2.5.jar.bak
mv jsp-api-2.1.jar jsp-api-2.1.jar.bak
mv jasper-compiler-5.5.23.jar jasper-compiler-5.5.23.jar.bak
mv jasper-runtime-5.5.23.jar jasper-runtime-5.5.23.jar.bak
拷贝mysql驱动到该目录下(使用mysql数据库,默认是derby)
scp mysql-connector-java-5.1.25-bin.jar node3:/usr/oozie/oozie-4.2.0/libext/
<property> <name>oozie.service.JPAService.create.db.schema</name> <value>true</value> </property> <property> <name>oozie.service.JPAService.jdbc.driver</name> <value>com.mysql.jdbc.Driver</value> </property> <property> <name>oozie.service.JPAService.jdbc.url</name> <value>jdbc:mysql://node4:3306/oozie?createDatabaseIfNotExist=true</value> </property> <property> <name>oozie.service.JPAService.jdbc.username</name> <value>root</value> </property> <property> <name>oozie.service.JPAService.jdbc.password</name> <value>root</value> </property> <property> <name>oozie.service.HadoopAccessorService.hadoop.configurations</name> <value>*=/usr/hadoop/hadoop-2.6.0/etc/hadoop</value> </property>
bin/oozie-setup.sh prepare-war
b. 初始化数据库
bin/ooziedb.sh create -sqlfile oozie.sql -run
c. 修改oozie-4.2.0/oozie-server/conf/server.xml文件,注释掉下面的记录
<!--<Listener className="org.apache.catalina.mbeans.ServerLifecycleListener" />-->
d. 上传jar包
bin/oozie-setup.sh sharelib create -fs hdfs://node1:8020
1.5 启动
2. 流程实例
2.1 MR任务流程
oozie.wf.application.path=hdfs://node1:8020/user/root/workflow/mr_demo/wf #Hadoop"R jobTracker=node1:8032 #Hadoop"fs.default.name nameNode=hdfs://node1:8020/ #Hadoop"mapred.queue.name queueName=default
2. workflow.xml
<workflow-app xmlns="uri:oozie:workflow:0.2" name="map-reduce-wf"> <start to="mr-node"/> <action name="mr-node"> <map-reduce> <job-tracker>${jobTracker}</job-tracker> <name-node>${nameNode}</name-node> <prepare> <delete path="${nameNode}/user/${wf:user()}/workflow/mr_demo/output"/> </prepare> <configuration> <property> <name>mapred.job.queue.name</name> <value>${queueName}</value> </property> <property> <name>mapreduce.mapper.class</name> <value>org.apache.hadoop.examples.WordCount$TokenizerMapper</value> </property> <property> <name>mapreduce.reducer.class</name> <value>org.apache.hadoop.examples.WordCount$IntSumReducer</value> </property> <property> <name>mapred.map.tasks</name> <value>1</value> </property> <property> <name>mapred.input.dir</name> <value>/user/${wf:user()}/bank.csv</value> </property> <property> <name>mapred.output.dir</name> <value>/user/${wf:user()}/workflow/mr_demo/output</value> </property> </configuration> </map-reduce> <ok to="end"/> <error to="fail"/> </action> <kill name="fail"> <message>Map/Reduce failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> </kill> <end name="end"/> </workflow-app>
3. 运行:
2.2 Pig任务流程
oozie.wf.application.path=hdfs://node1:8020/user/root/workflow/pig_demo/wf oozie.use.system.libpath=true #pig流程必须配置此选项 #Hadoop"ResourceManager resourceManager=node1:8032 #Hadoop"fs.default.name nameNode=hdfs://node1:8020/ #Hadoop"mapred.queue.name queueName=default
2. workflow.xml
<workflow-app xmlns="uri:oozie:workflow:0.2" name="whitehouse-workflow"> <start to="transform_job"/> <action name="transform_job"> <pig> <job-tracker>${resourceManager}</job-tracker> <name-node>${nameNode}</name-node> <prepare> <delete path="/user/root/workflow/pig_demo/output"/> </prepare> <script>transform_job.pig</script> </pig> <ok to="end"/> <error to="fail"/> </action> <kill name="fail"> <message>Job failed, error message[${wf:errorMessage(wf:lastErrorNode())}] </message> </kill> <end name="end"/> </workflow-app>
3 . transform_job.pig pig任务用到的脚本
bank_data= LOAD '/user/root/bank.csv' USING PigStorage(';') AS (age:int, job:chararray, marital:chararray,education:chararray, default:chararray,balance:int,housing:chararray,loan:chararray, contact:chararray,day:int,month:chararray,duration:int,campaign:int, pdays:int,previous:int,poutcom:chararray,y:chararray); age_gt_30 = FILTER bank_data BY age >= 30; store age_gt_30 into '/user/root/workflow/pig_demo/output' using PigStorage(',');4. 运行
2.3 Hive任务流程
nameNode=hdfs://node1:8020 jobTracker=node1:8032 queueName=default maxAge=30 input=/user/root/bank.csv output=/user/root/workflow/hive_demo/output oozie.use.system.libpath=true oozie.wf.application.path=${nameNode}/user/${user.name}/workflow/hive_demo/wf2. workflow.xml
<workflow-app xmlns="uri:oozie:workflow:0.2" name="hive-wf"> <start to="hive-node"/> <action name="hive-node"> <hive xmlns="uri:oozie:hive-action:0.2"> <job-tracker>${jobTracker}</job-tracker> <name-node>${nameNode}</name-node> <prepare> <delete path="${output}/hive"/> <mkdir path="${output}"/> </prepare> <configuration> <property> <name>mapred.job.queue.name</name> <value>${queueName}</value> </property> </configuration> <script>script.hive</script> <param>INPUT=${input}</param> <param>OUTPUT=${output}/hive</param> <param>maxAge=${maxAge}</param> </hive> <ok to="end"/> <error to="fail"/> </action> <kill name="fail"> <message>Hive failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> </kill> <end name="end"/> </workflow-app>3. hive任务用到的脚本 script.hive
DROP TABLE IF EXISTS bank; CREATE TABLE bank( age int, job string, marital string,education string, default string,balance int,housing string,loan string, contact string,day int,month string,duration int,campaign int, pdays int,previous int,poutcom string,y string ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\073' STORED AS TEXTFILE; LOAD DATA INPATH '${INPUT}' INTO TABLE bank; INSERT OVERWRITE DIRECTORY '${OUTPUT}' SELECT * FROM bank where age > '${maxAge}';注意:‘\073’ 代表分号;
4. 运行,参考上面
2.4 Hive 2 任务流程
nameNode=hdfs://node1:8020 jobTracker=node1:8032 queueName=default jdbcURL=jdbc:hive2://node4:10000/default # hiveserver2 时,配置此选项 maxAge=30 input=/user/root/bank.csv output=/user/root/workflow/hive2_demo/output oozie.use.system.libpath=true oozie.wf.application.path=${nameNode}/user/${user.name}/workflow/hive2_demo/wf
2. workflow.xml
<workflow-app xmlns="uri:oozie:workflow:0.5" name="hive2-wf"> <start to="hive2-node"/> <action name="hive2-node"> <hive2 xmlns="uri:oozie:hive2-action:0.1"> <job-tracker>${jobTracker}</job-tracker> <name-node>${nameNode}</name-node> <prepare> <delete path="${output}/hive"/> <mkdir path="${output}"/> </prepare> <configuration> <property> <name>mapred.job.queue.name</name> <value>${queueName}</value> </property> </configuration> <jdbc-url>${jdbcURL}</jdbc-url> <script>script2.hive</script> <param>INPUT=${input}</param> <param>OUTPUT=${output}/hive</param> <param>maxAge=${maxAge}</param> </hive2> <ok to="end"/> <error to="fail"/> </action> <kill name="fail"> <message>Hive2 failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> </kill> <end name="end"/> </workflow-app>3. hive2用到的脚本: script2.hive
DROP TABLE IF EXISTS bank2; CREATE TABLE bank2( age int, job string, marital string,education string, default string,balance int,housing string,loan string, contact string,day int,month string,duration int,campaign int, pdays int,previous int,poutcom string,y string ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\073' STORED AS TEXTFILE; LOAD DATA INPATH '${INPUT}' INTO TABLE bank2; INSERT OVERWRITE DIRECTORY '${OUTPUT}' SELECT * FROM bank2 where age > '${maxAge}';
4. 运行,参考上面
2.5 Spark 任务流程
nameNode=hdfs://node1:8020 jobTracker=node1:8032 #master=spark://node2:7077 master=spark://node2:6066 sparkMode=cluster queueName=default oozie.use.system.libpath=true input=/user/root/bank.csv output=/user/root/workflow/spark_demo/output # the jar file must be local jarPath=${nameNode}/user/root/workflow/spark_demo/lib/oozie-examples.jar oozie.wf.application.path=${nameNode}/user/${user.name}/workflow/spark_demo/wf由于sparkMode采用cluster,所以master的链接需要是下面的6066,:
<workflow-app xmlns='uri:oozie:workflow:0.5' name='SparkFileCopy'> <start to='spark-node' /> <action name='spark-node'> <spark xmlns="uri:oozie:spark-action:0.1"> <job-tracker>${jobTracker}</job-tracker> <name-node>${nameNode}</name-node> <prepare> <delete path="${output}"/> </prepare> <master>${master}</master> <mode>${sparkMode}</mode> <name>Spark-FileCopy</name> <class>org.apache.oozie.example.SparkFileCopy</class> <jar>${jarPath}</jar> <arg>${input}</arg> <arg>${output}</arg> </spark> <ok to="end" /> <error to="fail" /> </action> <kill name="fail"> <message>Workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}] </message> </kill> <end name='end' /> </workflow-app>
3. 运行:
2.6 spark on yarn任务流程
nameNode=hdfs://node1:8020 jobTracker=node1:8032 #master=spark://node2:7077 #master=spark://node2:6066 master=yarn-cluster #sparkMode=cluster queueName=default oozie.use.system.libpath=true input=/user/root/bank.csv output=/user/root/workflow/sparkonyarn_demo/output jarPath=${nameNode}/user/root/workflow/sparkonyarn_demo/lib/oozie-examples.jar oozie.wf.application.path=${nameNode}/user/${user.name}/workflow/sparkonyarn_demo2. workflow.xml:
<workflow-app xmlns='uri:oozie:workflow:0.5' name='SparkFileCopy_on_yarn'> <start to='spark-node' /> <action name='spark-node'> <spark xmlns="uri:oozie:spark-action:0.1"> <job-tracker>${jobTracker}</job-tracker> <name-node>${nameNode}</name-node> <prepare> <delete path="${output}"/> </prepare> <master>${master}</master> <name>Spark-FileCopy-on-yarn</name> <class>org.apache.oozie.example.SparkFileCopy</class> <jar>${jarPath}</jar> <spark-opts>--conf spark.yarn.historyServer.address=http://node2:18080 --conf spark.eventLog.dir=hdfs://node1:8020/spark-log --conf spark.eventLog.enabled=true</spark-opts> <arg>${input}</arg> <arg>${output}</arg> </spark> <ok to="end" /> <error to="fail" /> </action> <kill name="fail"> <message>Workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}] </message> </kill> <end name='end' /> </workflow-app>
3. 运行;