Spark中的rollup
jopen
9年前
Spark中的rollup
在对数据进行小计或合计运算时,rollup和cube一样,算是常用的操作了。Spark的DataFrame提供了rollup函数支持此功能。
假设准备了如下数据:
trait SalesDataFrameFixture extends DataFrameFixture with SparkSqlSupport { implicit class StringFuncs(str: String) { def toTimestamp = new Timestamp(Date.valueOf(str).getTime) } import sqlContext.implicits._ val sales = Seq( (1, "Widget Co", 1000.00, 0.00, "广东省", "深圳市", "2014-02-01".toTimestamp), (2, "Acme Widgets", 1000.00, 500.00, "四川省", "成都市", "2014-02-11".toTimestamp), (3, "Acme Widgets", 1000.00, 500.00, "四川省", "绵阳市", "2014-02-12".toTimestamp), (4, "Acme Widgets", 1000.00, 500.00, "四川省", "成都市", "2014-02-13".toTimestamp), (5, "Widget Co", 1000.00, 0.00, "广东省", "广州市", "2015-01-01".toTimestamp), (6, "Acme Widgets", 1000.00, 500.00, "四川省", "泸州市", "2015-01-11".toTimestamp), (7, "Widgetry", 1000.00, 200.00, "四川省", "成都市", "2015-02-11".toTimestamp), (8, "Widgets R Us", 3000.00, 0.0, "四川省", "绵阳市", "2015-02-19".toTimestamp), (9, "Widgets R Us", 2000.00, 0.0, "广东省", "深圳市", "2015-02-20".toTimestamp), (10, "Ye Olde Widgete", 3000.00, 0.0, "广东省", "深圳市", "2015-02-28".toTimestamp), (11, "Ye Olde Widgete", 3000.00, 0.0, "广东省", "广州市", "2015-02-28".toTimestamp) ) val saleDF = sqlContext.sparkContext.parallelize(sales, 4).toDF("id", "name", "sales", "discount", "province", "city", "saleDate") }
注册临时表,并执行SQL语句:
saleDF.registerTempTable("sales") val dataFrame = sqlContext.sql("select province,city,sales from sales") dataFrame.show
执行的结果如下:
| province |city | sales | |----------|-----|-------| | 广东省| 深圳市|1000.0| | 四川省| 成都市|1000.0| | 四川省| 绵阳市|1000.0| | 四川省| 成都市|1000.0| | 广东省| 广州市|1000.0| | 四川省| 泸州市|1000.0| | 四川省| 成都市|1000.0| | 四川省| 绵阳市|3000.0| | 广东省| 深圳市|2000.0| | 广东省| 深圳市|3000.0| | 广东省| 广州市|3000.0|
对该DataFrame执行rollup:
val resultDF = dataFrame.rollup($"province", $"city").agg(Map("sales" -> "sum")) resultDF.show
在这个例子中,rollup操作相当于对dataFrame中的province与city进行分组,并在此基础上针对sales进行求和运算,故而获得的结果为:
|province|city|sum(sales)| |--------|----|----------| | null|null| 18000.0| | 广东省|null| 10000.0| | 广东省| 深圳市| 6000.0| | 四川省|null| 8000.0| | 四川省| 成都市| 3000.0| | 四川省| 绵阳市| 4000.0| | 广东省| 广州市| 4000.0| | 四川省| 泸州市| 1000.0|
操作非常简单,然而遗憾地是并不符合我们产品的场景,因为我们需要根据某些元数据直接组装为Spark SQL的sql语句。在Spark的hiveContext中,支持这样的语法:
hiveContext.sql("select province, city, sum(sales) from sales group by province, city with rollup")
可惜,SQLContext并不支持这一功能。我在Spark User Mailing List中咨询了这个问题。Intel的Cheng Hao(Spark的一位非常活跃的contributer)告诉了我为何不支持的原因。因为在Spark SQL 1.x版本中,对SQL语法的解析采用了Scala的Parser机制。这种实现方式较弱,对语法的解析支持不够。Spark的Issue #5080尝试提供此功能,然而并没有被合并到Master中。Spark并不希望在1.x版本的SQLParser中添加新的关键字,它的计划是在Spark 2.0中用HQL Parser来替代目前较为简陋的SQL Parser。
如果希望在sql中使用rollup,那么有三个选择:
- 使用HQLContext;
- pull #5080的代码,自己建立一个Spark的分支;
- 等待Spark 2.0版本发布。
来自: http://zhangyi.farbox.com/post/kai-yuan-kuang-jia/rollup-in-spark