使用Spark DataFrame进行大数据处理
jopen
9年前
简介
DataFrame让Spark具备了处理大规模结构化数据的能力,在比原有的RDD转化方式易用的前提下,计算性能更还快了两倍。这一个小小的API,隐含着Spark希望大一统「大数据江湖」的野心和决心。DataFrame像是一条联结所有主流数据源并自动转化为可并行处理格式的水渠,通过它Spark能取悦大数据生态链上的所有玩家,无论是善用R的数据科学家,惯用SQL的商业分析师,还是在意效率和实时性的统计工程师。
例子说明
提供了将结构化数据为DataFrame并注册为表,使用SQL查询的例子
提供了从RMDB中读取数据为DataFrame的例子
提供了将数据写入到RMDB中的例子
代码样例
import scala.collection.mutable.ArrayBuffer import scala.io.Source import java.io.PrintWriter import util.control.Breaks._ import org.apache.spark.SparkContext import org.apache.spark.sql.SQLContext import java.sql.DriverManager import java.sql.PreparedStatement import java.sql.Connection import org.apache.spark.sql.types.IntegerType import org.apache.spark.sql.types.StructType import org.apache.spark.sql.types.StructField import org.apache.spark.sql.types.StringType import org.apache.spark.sql.Row import java.util.Properties import org.apache.spark.sql.SaveMode object SimpleDemo extends App { val sc = new SparkContext("local[*]", "test") val sqlc = new SQLContext(sc) val driverUrl = "jdbc:mysql://ip:3306/ding?user=root&password=root&zeroDateTimeBehavior=convertToNull&characterEncoding=utf-8" val tableName = "tbaclusterresult" //把数据转化为DataFrame,并注册为一个表 val df = sqlc.read.json("G:/data/json.txt") df.registerTempTable("user") val res = sqlc.sql("select * from user") println(res.count() + "---------------------------") res.collect().map { row => { println(row.toString()) } } //从MYSQL读取数据 val jdbcDF = sqlc.read .options(Map("url" -> driverUrl, // "user" -> "root", // "password" -> "root", "dbtable" -> tableName)) .format("jdbc") .load() println(jdbcDF.count() + "---------------------------") jdbcDF.collect().map { row => { println(row.toString()) } } //插入数据至MYSQL val schema = StructType( StructField("name", StringType) :: StructField("age", IntegerType) :: Nil) val data1 = sc.parallelize(List(("blog1", 301), ("iteblog", 29), ("com", 40), ("bt", 33), ("www", 23))). map(item => Row.apply(item._1, item._2)) import sqlc.implicits._ val df1 = sqlc.createDataFrame(data1, schema) // df1.write.jdbc(driverUrl, "sparktomysql", new Properties) df1.write.mode(SaveMode.Overwrite).jdbc(driverUrl, "testtable", new Properties) //DataFrame类中还有insertIntoJDBC方法,调用该函数必须保证表事先存在,它只用于插入数据,函数原型如下: //def insertIntoJDBC(url: String, table: String, overwrite: Boolean): Unit //插入数据到MYSQL val data = sc.parallelize(List(("www", 10), ("iteblog", 20), ("com", 30))) data.foreachPartition(myFun) case class Blog(name: String, count: Int) def myFun(iterator: Iterator[(String, Int)]): Unit = { var conn: Connection = null var ps: PreparedStatement = null val sql = "insert into blog(name, count) values (?, ?)" try { conn = DriverManager.getConnection(driverUrl, "root", "root") iterator.foreach(data => { ps = conn.prepareStatement(sql) ps.setString(1, data._1) ps.setInt(2, data._2) ps.executeUpdate() }) } catch { case e: Exception => e.printStackTrace() } finally { if (ps != null) { ps.close() } if (conn != null) { conn.close() } } } }