Spark学习-RDD编程基础

风云决 8年前
   <h2>1. RDD基础概念</h2>    <p>Spark上开发的应用程序都是由一个driver programe构成,这个所谓的驱动程序在Spark集群通过跑main函数来执行各种并行操作。集群上的所有节点进行并行计算需要共同访问一个分区元素的集合,这就是 RDD(RDD resilient distributed dataset)弹性分布式数据集 。RDD可以存储在内存或磁盘中,具有一定的容错性,可以在节点宕机重启后恢复。 在Spark 中, 对数据的所有操作不外乎创建RDD、转化已有RDD 以及调RDD 操作进行求值。而在这一切背后,Spark 会自动将RDD 中的数据分发到集群上,并将操作并行化执行。</p>    <h2>2. 创建RDD</h2>    <p>创建RDD有两种方式:一种是通过并行化驱动程序中的已有集合创建,另外一种方法是读取外部数据集。</p>    <h3>2.1 并行化</h3>    <p>一种非常简单的创建RDD的方式,将程序中的一个集合传给 SparkContext 的 parallelize() 方法。 <strong>python中的操作(pyspark打开shell):</strong></p>    <pre>  <code class="language-python">>>> data = [1, 2, 3, 4, 5]  >>> distData = sc.parallelize(data)  # 对RDD进行测试操作  # 对集合中的所有元素进行相加,返回结果为15  >>> distData.reduce(lambda a, b: a + b)  15</code></pre>    <p>scala中的操作(spark-shell打开shell):</p>    <pre>  <code class="language-python">scala> val data = Array(1,2,3,4,5)  data: Array[Int] = Array(1, 2, 3, 4, 5)    scala> val distData = sc.parallelize(data)  distData: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:23    scala> distData.reduce((a,b)=>a+b)  res0: Int = 15</code></pre>    <h3>2.2 读取外部数据集</h3>    <p>Spark可以从任何Hadoop支持的存储上创建RDD,比如本地的文件系统,HDFS,Cassandra等。Spark可以支持文本文件,SequenceFiles等。</p>    <p>这种方法更为常用。</p>    <p>python:</p>    <pre>  <code class="language-python"># 从protocols文件创建RDD  distFile = sc.textFile("/etc/protocols")</code></pre>    <p>scala:</p>    <pre>  <code class="language-python">// 从protocols文件创建RDD  val distFile = sc.textFile("/etc/protocols")</code></pre>    <h2>RDD操作</h2>    <p>RDD支持两种操作:</p>    <ol>     <li>转换(transformations):将已存在的数据集转换成新的数据集,例如map。 转换是惰性的,不会立刻计算结果,仅仅记录转换操作应用的目标数据集,当动作需要一个结果时才计算。</li>     <li>动作(actions) :数据集计算后返回一个值给驱动程序,例如reduce</li>    </ol>    <p>转换操作是懒惰的,举个例子:</p>    <pre>  <code class="language-python">>>> lines = sc.textFile("README.md")  >>> pythonLines = lines.filter(lambda line: "Python" in line)  >>>> pythonLines.first()</code></pre>    <p>如果Spark 在我们运行lines = sc.textFile(…) 时就把文件中所有的行都读取并存储起来,就会消耗很多存储空间,而我们马上就要筛选掉其中的很多数据。相反, 一旦Spark 了解了完整的转化操作链之后,它就可以只计算求结果时真正需要的数据。 事实上,在行动操作first() 中,Spark 只需要扫描文件直到找到第一个匹配的行为止,而不需要读取整个文件。</p>    <p>下面引用《Spark快速大数据分析》的一段话:</p>    <p>我们不应该把RDD 看作存 放着特定数据的数据集,而最好把每个RDD 当作我们通过转化操作构建出来的、记录如 何计算数据的指令列表。把数据读取到RDD 的操作也同样是惰性的。因此,当我们调用 sc.textFile() 时,数据并没有读取进来,而是在必要时才会读取。和转化操作一样的是, 读取数据的操作也有可能会多次执行。</p>    <p>简单的例子理解转换和行动操作:</p>    <p>python:</p>    <pre>  <code class="language-python"># 从protocols文件创建RDD  distFile = sc.textFile("/etc/protocols")    # Map操作,每行的长度,转换操作  lineLengths = distFile.map(lambda s: len(s))    # Reduce操作,获得所有行长度的和,即文件总长度,这里才会执行map运算  totalLength = lineLengths.reduce(lambda a, b: a + b)    # 可以将转换后的RDD保存到集群内存中  lineLengths.persist()</code></pre>    <p>scala:</p>    <pre>  <code class="language-python">// 从protocols文件创建RDD  val distFile = sc.textFile("/etc/protocols")    // Map操作,每行的长度  val lineLengths = distFile.map(s => s.length)    // Reduce操作,获得所有行长度的和,即文件总长度,这里才会执行map运算  val totalLength = lineLengths.reduce((a, b) => a + b)    // 可以将转换后的RDD保存到集群内存中  lineLengths.persist()</code></pre>    <h3>常见的转换和行动操作</h3>    <p>如下图所示:</p>    <p style="text-align:center"><img src="https://simg.open-open.com/show/3a5f20b956052323881a82e94f74260e.png"> <img src="https://simg.open-open.com/show/732161dbe83638d2b04d0ee25bb80ba9.png"></p>    <h2>4. 向Spark传递函数</h2>    <h3>4.1 python</h3>    <ol>     <li>匿名函数:lambda的大量使用,让简单的函数写成表达式的样子。</li>     <li>模块里的顶层函数</li>     <li>Spark中已调用函数中定义的本地函数</li>    </ol>    <pre>  <code class="language-python"># 定义需要传递给Spark的函数  def myFunc(s):      words = s.split(" ")      return len(words)  # 将词数统计函数传递给Spark  sc.textFile("/etc/protocols").map(myFunc).reduce(lambda a,b:a+b)</code></pre>    <p> </p>    <p>上述中定义的函数为计算一行字符串中单词的个数, sc.textFile("/etc/protocols").map(myFunc).reduce(lambda a,b:a+b)</p>    <p>表明先读取内容转出RDD,然后转换map()是对每一行都进行统计操作,然后进行行动操作,用lambda匿名函数计算所有行的单词和。</p>    <h3>4.2 scala</h3>    <ol>     <li>匿名函数:常用于短小的代码片段。</li>     <li>全局单例对象中的静态函数:例如,你可以定义一个object MyFunctions,然后将MyFunctions.func1传入,就像下面这样:</li>    </ol>    <pre>  <code class="language-python">//创建一个单例对象MyFunctions  object MyFunctions {    def func1(s: String): Int = {s.split(" ").length}  }  val lineLengths = sc.textFile("/etc/protocols").map(MyFunctions.func1)  val count = lineLengths.reduce((a,b)=>a+b)</code></pre>    <p>与python几乎一样,注意其匿名函数的写法。</p>    <p>参考资料:</p>    <p>1. 《spark快速大数据分析》</p>    <p>2. <a href="/misc/goto?guid=4959741101876863374" rel="nofollow,noindex">https://www.shiyanlou.com/courses/456/labs/1462/document</a></p>    <p> </p>    <p>来自:http://blog.csdn.net/taoyanqi8932/article/details/60972046</p>    <p> </p>