Spark2: 对RDD进行编程系列
首先,什么是RDD?
1 官方定义
Resilient Distributed Dataset (RDD)
Each RDD is split into multiple partitions, which may be computed on different nodes of the cluster.其实就是说一个数据集,比如吧,一个100G的大文件,就是一个RDD,但是它是分布式的,
也就是说会分成若干块,每块会存在于集群中的一个或者多个节点上。
简单来说,就是分而存之。
2 持久化
只要你需要,你可以把这个RDD持久化,语法就是 RDD.persist()。
RDD中的一下概念
Transformations are operations on RDDs that return a new RDD.比如
val inputRDD = sc.textFile("log.txt")
val errorsRDD = inputRDD.filter(line => line.contains("error"))
这里只是重新生成了一个RDD集合,如果你在inputRDD基础上生成了2个集合,
你可以用union()来达到并集的目的!
Actions
比如
println("Here are 10 examples:")badLinesRDD.take(10).foreach(println)
意思就是取出前10条分别打印!
collect()可以用来检索整个RDD,但是保证结果可以放在一个机器的内存里,所以collect()不适合处理大量的数据集。
saveAsTextFileaction和 saveAsSequenceFile可以用来保存文件到外部文件系统中!
----------示意图
再来几个scala的例子
class SearchFunctions(val query: String) {
def isMatch(s: String): Boolean = {
s.contains(query)
}
def getMatchesFunctionReference(rdd: RDD[String]): RDD[String] = {
// Problem: "isMatch" means "this.isMatch", so we pass all of "this"
rdd.map(isMatch)
}
def getMatchesFieldReference(rdd: RDD[String]): RDD[String] = {
// Problem: "query" means "this.query", so we pass all of "this"
rdd.map(x => x.split(query))
}
def getMatchesNoReference(rdd: RDD[String]): RDD[String] = {
// Safe: extract just the field we need into a local variable
val query_ = this.query
rdd.map(x => x.split(query_))
}
}
---
关于map
The map transformation takes in a function and applies it to each
element in the RDD with the result of the function being the new value of each element
in the resulting RDD.
意思很简单,自己体会即可!
---
关于map和 filter
例子:
val input = sc.parallelize(List(1, 2, 3, 4))
val result = input.map(x => x*x)
println(result.collect())
例子2:
val lines = sc.parallelize(List("hello world", "hi"))
val words = lines.flatMap(line => line.split(" "))
words.first() // returns "hello"
======================================
其它一些操作示意图:
union包含重复的,intersection去掉重复的
也可以做一个笛卡尔乘积:
来自:http://my.oschina.net/qiangzigege/blog/314140