Spark SQL 初探: 使用大数据分析2000万数据
目录 [−]
- 安装和配置Spark
- Spark初试
- 使用Spark SQL分析数据
去年网上曾放出个2000W的开房记录的数据库, 不知真假。 最近在学习Spark, 所以特意从网上找来数据测试一下, 这是一个绝佳的大数据素材。 如果数据涉及到个人隐私,请尽快删除, 本站不提供此类数据。你可以写个随机程序生成2000W的测试数据, 以CSV格式。
Spark是UC Berkeley AMP lab所开源的类Hadoop MapReduce的通用的并行计算框架,Spark基于map reduce算法实现的分布式计算,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是Job中间输出和结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更 好地适用于数据挖掘与机器学习等需要迭代的map reduce的算法。
Spark是一个高效的分布式计算系统,相比Hadoop,它在性能上比Hadoop要高100倍。Spark提供比Hadoop更上层的API, 同样的算法在Spark中实现往往只有Hadoop的1/10或者1/100的长度。Shark类似“SQL on Spark”,是一个在Spark上数据仓库的实现,在兼容Hive的情况下,性能最高可以达到Hive的一百倍。
Apache Spark 是在 Scala 语言中实现的,它将 Scala 用作其应用程序框架。与 Hadoop 不同,Spark 和 Scala 能够紧密集成,其中的 Scala 可以像操作本地集合对象一样轻松地操作分布式数据集。
2014年处, Apache 基金会宣布旗下的 Apache Spark 项目成为基金会的顶级项目,拥有顶级域名 http://spark.apache.org/。 Spark 的用户包括:阿里巴巴、Cloudera、Databricks、IBM、英特尔和雅虎等知名厂商。
Spark SQL是支持在Spark中使用Sql、HiveSql、Scaca中的关系型查询表达式。它的核心组件是一个新增的RDD类型SchemaRDD,它把 行对象用一个Schema来描述行里面的所有列的数据类型,它就像是关系型数据库里面的一张表。它可以从原有的RDD创建,也可以是Parquet文件, 最重要的是它可以支持用HiveQL从hive里面读取数据。
在2014年7月1日的Spark Summit上,Databricks宣布终止对Shark的开发,将重点放到Spark SQL上。在会议上,Databricks表示,Shark更多是对Hive的改造,替换了Hive的物理执行引擎,因此会有一个很快的速度。然而,不容 忽视的是,Shark继承了大量的Hive代码,因此给优化和维护带来了大量的麻烦。随着性能优化和先进分析整合的进一步加深,基于MapReduce设 计的部分无疑成为了整个项目的瓶颈。 详细内容请参看 Shark, Spark SQL, Hive on Spark, and the future of SQL on Spark
当前Spark SQL还处于alpha阶段,一些API在将将来的版本中可能会有所改变。
我也翻译几篇重要的Spark文档,你可以在我的网站找到。 Spark翻译文档
本文主要介绍了下面几个知识点:
- Spark读取文件夹的文件
- Spark filter和map使用
- Spark sql语句调用
- 自定义Spark sql的函数
提前讲一下,我也是最近才学习Spark及其相关的技术如Scala,下面的例子纯粹为了验证性的试验, 相信例子代码很很多优化的地方。
安装和配置Spark
当前最新的Spark版本为1.1.1, 因为我们以Standalone方式运行Spark,所以直接随便挑一个版本, 比如spark-1.1.1-bin-hadoop2.4.tgz, 解压到你的机器上。
我使用的CentOS 6.4。 具体来讲,它是我笔记本的一个虚拟机, 4个核, 4G内存。
在/opt解压它, 命令行中进入解压后的目录/opt/spark-1.1.1-bin-hadoop2.4。
运行./bin/spark-shell
就可以启动一个交互式的spark shell控制台, 在其中可以执行scala代码。
Spark初试
因为我们以本地单机的形式测试Spark, 你需要配置以下你的spark, 否则在分析大数据时很容易出现内存不够的问题。
在我的机器上, conf文件夹下复制一份spark-defaults.conf,将使用的内存增大一些:
启动shark-shell的时候设置使用4个核。
根据 Spark 快速入门 中的介绍运行个例子测试一下:
这个例子从Spark解压目录下的README.md文件创建一个RDD,并统计此文件有多少行。
再看一个抛针法计算PI值的例子。
结果为:
到目前为止,我们搭建好了一个Spark环境, 并简单进行了测试。 下一步我们使用Spark SQL分析前面所说的数据。
使用Spark SQL分析数据
这一步,我们使用Spark SQL按照星座对2000W数据进行分组统计, 看看哪个星座的人最喜欢开房。
当然, 使用纯Spark也可以完成我们的分析, 因为实际Spark SQL最终是利用Spark来完成的。
实际测试中发现这些数据并不是完全遵守一个schema, 有些数据的格式是不对的, 有些数据的数据项也是错误的。 在代码中我们要剔除那么干扰数据。
反正我们用这个数据测试者玩, 并没有严格的要求去整理哪些错误数据。
先看代码:
为了使用spark sql,你需要引入 sqlContext.createSchemaRDD
. Spark sql一个核心对象就是SchemaRDD
。 上面的import
可以隐式的将一个RDD转换成SchemaRDD。
接着定义了Customer
类,用来映射每一行的数据, 我们只使用每一行很少的信息, 像地址,email等都没用到。
接下来从2000W文件夹中读取所有的csv文件, 创建一个RDD并注册表customer。
因为没有一个内建的函数可以将出生一起映射为星座, 所以我们需要定义一个映射函数myfun
, 并把它注册到SparkContext中。 这样我们就可以在sql语句中使用这个函数。 类似地,字符串的length函数当前也不支持, 你可以增加一个这样的函数。 因为有的日期不正确,所有特别增加了一个”未知”的星座。 错误数据可能有两种, 一是日期出错, 而是此行格式不对,将其它字段映射成了出生日期。 我们在分析的时候忽略它们好了。
然后执行一个分组的sql语句。这个sql语句查询结果类型为SchemaRDD, 也继承了RDD所有的操作。
最后将结果打印出来。
看起来魔蝎座的人最喜欢开房了, 明显比其它星座的人要多。
我们也可以分析一下开房的男女比例:
结果显示男女开房的人数大约是2:1
[F,6475461] [M,12763926]来自:http://colobu.com/2014/12/11/spark-sql-quick-start/