Spark Streaming 结合 Kafka 两种不同的数据接收方式比较
MilBrinkley
8年前
<p>DirectKafkaInputDStream 只在 driver 端接收数据,所以继承了 InputDStream,是没有 receivers 的</p> <p>在结合 Spark Streaming 及 Kafka 的实时应用中,我们通常使用以下两个 API 来获取最初的 DStream(这里不关心这两个 API 的重载):</p> <pre> <code class="language-python">KafkaUtils#createDirectStream</code></pre> <p>及</p> <pre> <code class="language-python">KafkaUtils#createStream</code></pre> <p>这两个 API 除了要传入的参数不同外,接收 kafka 数据的节点、拉取数据的时机也完全不同。本文将分别就两者进行详细分析。</p> <h2>KafkaUtils#createStream</h2> <p>先来分析 createStream ,在该函数中,会新建一个 KafkaInputDStream 对象, KafkaInputDStream 继承于 ReceiverInputDStream 。</p> <ol> <li>继承ReceiverInputDStream的类需要重载 getReceiver 函数以提供用于接收数据的 receiver</li> <li>recever 会调度到某个 executor 上并启动,不间断的接收数据并将收到的数据交由 ReceiverSupervisor 存成 block 作为 RDD 输入数据</li> </ol> <p>KafkaInputDStream当然也实现了getReceiver方法,如下:</p> <pre> <code class="language-python">def getReceiver(): Receiver[(K, V)] = { if (!useReliableReceiver) { //< 不启用 WAL new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel) } else { //< 启用 WAL new ReliableKafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel) } }</code></pre> <p>根据是否启用 WAL,receiver 分为 KafkaReceiver 和 ReliableKafkaReceiver。 </p> <ol> <li>receiver 是如何被分发启动的</li> <li>receiver 接受数据后数据的流转过程<br> 并在 揭开Spark Streaming神秘面纱③ - 动态生成 job 一文中详细介绍了</li> <li>receiver 接受的数据存储为 block 后,如何将 blocks 作为 RDD 的输入数据</li> <li>动态生成 job</li> </ol> <p>以上两篇文章并没有具体介绍 receiver 是如何接收数据的,当然每个重载了 ReceiverInputDStream 的类的 receiver 接收数据方式都不相同。下图描述了 KafkaReceiver 接收数据的具体流程:</p> <p style="text-align:center"><img src="https://simg.open-open.com/show/5b0e1c88e03c328f326c3596067854d9.jpg"></p> <h2>KafkaUtils#createDirectStream</h2> <p>在 揭开Spark Streaming神秘面纱③ - 动态生成 job 中,介绍了在生成每个 batch 的过程中,会去取这个 batch 对应的 RDD,若未生成该 RDD,则会取该 RDD 对应的 blocks 数据来生成 RDD,最终会调用到 DStream#compute(validTime: Time) 函数,在 KafkaUtils#createDirectStream 调用中,会新建 DirectKafkaInputDStream , DirectKafkaInputDStream#compute(validTime: Time) 会从 kafka 拉取数据并生成 RDD,流程如下:</p> <p style="text-align:center"><img src="https://simg.open-open.com/show/1859b39dbab9b3822e55e4f44801c013.jpg"></p> <p>如上图所示,该函数主要做了以下三个事情:</p> <ol> <li>确定要接收的 partitions 的 offsetRange,以作为第2步创建的 RDD 的数据来源</li> <li>创建 RDD 并执行 count 操作,使 RDD 真实具有数据</li> <li>以 streamId、数据条数,offsetRanges 信息初始化 inputInfo 并添加到 JobScheduler 中</li> </ol> <p>进一步看 KafkaRDD 的 getPartitions 实现:</p> <pre> <code class="language-python">override def getPartitions: Array[Partition] = { offsetRanges.zipWithIndex.map { case (o, i) => val (host, port) = leaders(TopicAndPartition(o.topic, o.partition)) new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset, host, port) }.toArray }</code></pre> <p>从上面的代码可以很明显看到,KafkaRDD 的 partition 数据与 Kafka topic 的某个 partition 的 o.fromOffset 至 o.untilOffset 数据是相对应的,也就是说 KafkaRDD 的 partition 与 Kafka partition 是一一对应的</p> <p>通过以上分析,我们可以对这两种方式的区别做一个总结:</p> <ol> <li>createStream会使用 Receiver;而createDirectStream不会</li> <li>createStream使用的 Receiver 会分发到某个 executor 上去启动并接受数据;而createDirectStream直接在 driver 上接收数据</li> <li>createStream使用 Receiver 源源不断的接收数据并把数据交给 ReceiverSupervisor 处理最终存储为 blocks 作为 RDD 的输入,从 kafka 拉取数据与计算消费数据相互独立;而createDirectStream会在每个 batch 拉取数据并就地消费,到下个 batch 再次拉取消费,周而复始,从 kafka 拉取数据与计算消费数据是连续的,没有独立开</li> <li>createStream中创建的KafkaInputDStream 每个 batch 所对应的 RDD 的 partition 不与 Kafka partition 一一对应;而createDirectStream中创建的 DirectKafkaInputDStream 每个 batch 所对应的 RDD 的 partition 与 Kafka partition 一一对应</li> </ol> <p> </p> <p>来自:http://www.jianshu.com/p/60344796f8a5</p> <p> </p>