hadoop与mysql数据库的那点事
htwoz
9年前
来自: http://www.cnblogs.com/JimLy-BUG/p/5177952.html
转眼间已经接触了hadoop两周了,从之前的极力排斥到如今的有点喜欢,刚开始被搭建hadoop开发环境搞得几乎要放弃,如今学会了编写小程序,每天都在成长一点挺好的,好好努力,为自己的装备库再填一件武器挺好的,学习在于坚持不懈,加油!!!
马上就要过年了,在最后一天的上班时间内完成了hadoop如何去连接mysql数据库,自己感到很满足,下面就把自己编写的源码贡献给大家,希望能够帮到你们,如存在优化的地方还请大牛们指出,也希望有hadoop的大牛能够给点学习建议,一个来个HA初学者的心声。第一次发布竟然被退回,也不知道为什么,瞬间心情都不好了,但我还是坚持写自己的博客...
StudentRecord类:
package com.simope.mr.db; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.lib.db.DBWritable; public class StudentRecord implements Writable, DBWritable{ int id; String name; int age; int departmentID; @Override public void readFields(DataInput in) throws IOException { this.id = in.readInt(); this.name = Text.readString(in); this.age = in.readInt(); this.departmentID = in.readInt(); } @Override public void write(DataOutput out) throws IOException { out.write(this.id); Text.writeString(out, this.name); out.write(this.age); out.write(this.departmentID); } public void readFields(ResultSet rs) throws SQLException { this.id = rs.getInt(1); this.name = rs.getString(2); this.age = rs.getInt(3); this.departmentID = rs.getInt(4); } public void write(PreparedStatement ps) throws SQLException { ps.setInt(1, this.id); ps.setString(2, this.name); ps.setInt(3, this.age); ps.setInt(4, this.departmentID); } @Override public String toString() { return new String(this.name + "\t" + this.age + "\t" + this.departmentID); } }
TeacherRecord类:
package com.simope.mr.db; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.lib.db.DBWritable; public class TeacherRecord implements Writable, DBWritable{ int id; String name; int age; int departmentID; @Override public void readFields(DataInput in) throws IOException { this.id = in.readInt(); this.name = Text.readString(in); this.age = in.readInt(); this.departmentID = in.readInt(); } @Override public void write(DataOutput out) throws IOException { out.write(this.id); Text.writeString(out, this.name); out.write(this.age); out.write(this.departmentID); } public void readFields(ResultSet rs) throws SQLException { this.id = rs.getInt(1); this.name = rs.getString(2); this.age = rs.getInt(3); this.departmentID = rs.getInt(4); } public void write(PreparedStatement ps) throws SQLException { ps.setInt(1, this.id); ps.setString(2, this.name); ps.setInt(3, this.age); ps.setInt(4, this.departmentID); } @Override public String toString() { return new String(this.name + "\t" + this.age + "\t" + this.departmentID); } }
DBMapper类:
package com.simope.mr.db; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; public class DBMapper extends MapReduceBase implements Mapper<LongWritable, TeacherRecord, LongWritable, Text> { public void map(LongWritable key, TeacherRecord value, OutputCollector<LongWritable, Text> collector, Reporter reporter) throws IOException { collector.collect(new LongWritable(value.id), new Text(value.toString())); } }
DBReducer类:
package com.simope.mr.db; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; public class DBReducer extends MapReduceBase implements Reducer<LongWritable, Text, StudentRecord, Text>{ @Override public void reduce(LongWritable key, Iterator<Text> values, OutputCollector<StudentRecord, Text> output, Reporter reporter) throws IOException { String[] InfoArr = values.next().toString().split("\t"); StudentRecord s = new StudentRecord(); // t.id = Integer.parseInt(InfoArr[0]); //id是自增长 s.name = InfoArr[0]; s.age = Integer.parseInt(InfoArr[1]); s.departmentID = Integer.parseInt(InfoArr[2]); output.collect(s, new Text(s.name)); } }
DBJob类: (读取数据库表内容,并将数据写入hdfs文件中) 数据库表- hdfs文件
package com.simope.mr.db; import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.lib.IdentityReducer; import org.apache.hadoop.mapred.lib.db.DBConfiguration; import org.apache.hadoop.mapred.lib.db.DBInputFormat; /** * @deprecated 读取数据库录入文件 * @author JimLy * @see 20160202 * */ public class DBJob { public static void main(String[] args) throws IOException{ JobConf jobConf = new JobConf(DBJob.class); jobConf.setOutputKeyClass(LongWritable.class); jobConf.setOutputValueClass(Text.class); jobConf.setInputFormat(DBInputFormat.class); FileOutputFormat.setOutputPath(jobConf, new Path("/usr/output/db")); DBConfiguration.configureDB(jobConf, "com.mysql.jdbc.Driver", "jdbc:mysql://10.10.1.1:3306/my_hd", "root", "root"); String[] fields = {"id", "name", "age", "departmentID"}; //从my_hd数据库的teacher表查询数据 DBInputFormat.setInput(jobConf, TeacherRecord.class, "teacher", null, "id", fields); jobConf.setMapperClass(DBMapper.class); jobConf.setReducerClass(IdentityReducer.class); JobClient.runJob(jobConf); } }
DB2Job类: (读取数据库表内容,并将数据写入hdfs文件中) 数据库表- hdfs文件
package com.simope.mr.db; import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.lib.IdentityReducer; import org.apache.hadoop.mapred.lib.db.DBConfiguration; import org.apache.hadoop.mapred.lib.db.DBInputFormat; /** * @deprecated 读取数据库录入文件 * @author JimLy * @see 20160202 * */ public class DB2Job { public static void main(String[] args) throws IOException{ JobConf jobConf = new JobConf(DB2Job.class); jobConf.setOutputKeyClass(LongWritable.class); jobConf.setOutputValueClass(Text.class); jobConf.setInputFormat(DBInputFormat.class); FileOutputFormat.setOutputPath(jobConf, new Path("/usr/output/db")); DBConfiguration.configureDB(jobConf, "com.mysql.jdbc.Driver", "jdbc:mysql://10.10.1.1:3306/my_hd", "root", "root"); // String[] fields = {"id", "name", "age", "departmentID"}; String inputQuery = "SELECT * FROM teacher where id != 4"; String inputCountQuery = "SELECT COUNT(1) FROM teacher where id != 4"; //从my_hd数据库的teacher表查询数据 DBInputFormat.setInput(jobConf, TeacherRecord.class, inputQuery, inputCountQuery); jobConf.setMapperClass(DBMapper.class); jobConf.setReducerClass(IdentityReducer.class); JobClient.runJob(jobConf); } }
DB3Job类: (读取hdfs文件中的内容,并将数据写入指定的数据库表中) =>hdfs文件- 数据库表
package com.simope.mr.db; import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.lib.IdentityMapper; import org.apache.hadoop.mapred.lib.db.DBConfiguration; import org.apache.hadoop.mapred.lib.db.DBOutputFormat; /** * @deprecated 读取文件录入数据库 * @author JimLy * @see 20160202 * */ public class DB3Job { public static void main(String[] args) throws IOException{ JobConf jobConf = new JobConf(DB3Job.class); jobConf.setInputFormat(TextInputFormat.class); jobConf.setOutputFormat(DBOutputFormat.class); FileInputFormat.addInputPath(jobConf, new Path("/usr/input/db")); DBConfiguration.configureDB(jobConf, "com.mysql.jdbc.Driver", "jdbc:mysql://10.10.1.1:3306/my_hd", "root", "root"); String[] fields = {"id", "name", "age", "departmentID"}; DBOutputFormat.setOutput(jobConf, "teacher", fields); jobConf.setMapperClass(IdentityMapper.class); jobConf.setReducerClass(DBReducer.class); JobClient.runJob(jobConf); } }
DB4Job类: (读取指定的数据库表信息,并将数据写入其他指定表中)=>数据库表-表
package com.simope.mr.db; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.lib.db.DBConfiguration; import org.apache.hadoop.mapred.lib.db.DBInputFormat; import org.apache.hadoop.mapred.lib.db.DBOutputFormat; /** * @deprecated 读取数据库表录入其他表 * @author JimLy * @see 20160202 * */ public class DB4Job { public static void main(String[] args) throws IOException{ JobConf jobConf = new JobConf(DB4Job.class); jobConf.setOutputKeyClass(LongWritable.class); jobConf.setOutputValueClass(Text.class); jobConf.setInputFormat(DBInputFormat.class); jobConf.setOutputFormat(DBOutputFormat.class); DBConfiguration.configureDB(jobConf, "com.mysql.jdbc.Driver", "jdbc:mysql://10.10.1.1:3306/my_hd", "root", "root"); String inputQuery = "SELECT * FROM teacher"; String inputCountQuery = "SELECT COUNT(1) FROM teacher"; //从my_hd数据库的teacher表查询数据 DBInputFormat.setInput(jobConf, TeacherRecord.class, inputQuery, inputCountQuery); String[] fields = {"id", "name", "age", "departmentID"}; DBOutputFormat.setOutput(jobConf, "student", fields); jobConf.setMapperClass(DBMapper.class); jobConf.setReducerClass(DBReducer.class); JobClient.runJob(jobConf); } }
如果你觉得写的不错的,请点个推荐,你的推荐是我继续坚持写博客的动力。。。
如需转载的请注明出处 : http://www.cnblogs.com/JimLy-BUG/