含 Apache Spark 的 Lambda 架构

wwang1969 8年前
   <h2><strong>目标</strong></h2>    <p>市场上很多玩家已经建造了MapReduce工作流用来日常处理兆兆字节的历史数据。但是谁愿意等待24小时来拿到更新后的分析报告?这篇文章会向你介绍 Lambda Architecture ,它被设计出来既可以利用批量处理方法,也可以使用流式处理方法。这样我们就可以利用Apache Spark(核心, SQL, 流),Apache Parquet,推ter Stream等工具处理实时流式数据,实现对历史数据的快速访问。代码简洁干净,而且附上直接明了的实例! </p>    <h2><strong>Apache Hadoop: 简要历史</strong></h2>    <p><img src="https://simg.open-open.com/show/491a6d7c7e8463507662ee06f047aebf.png"></p>    <p>Apache Hadoop的丰富历史开始于大约2002年。Hadoop是Doug Cutting创立的, 他也是Apache Lucene这一被广泛使用的文本检索库的创造者. Hadoop的起源与Apache Nutch有关, Apache Nutch是一个开源的web搜索引擎 , 本身也是Lucene项目的一部分. Apache Nutch在大约10年前成为一个独立的项目 .</p>    <p>事实上,许多用户实现了成功的基于HadoopM/R的通道,一直运行到现在.现实生活中我至少能举出好几个例子:</p>    <ul>     <li> <p>Oozie协调下的工作流 每日运行和处理多达8TB数据并生成分析报告</p> </li>     <li> <p>bash管理的工作流每日运行和处理多达8TB数据并生成分析报告</p> </li>    </ul>    <h2><strong>现在是2016年了!</strong></h2>    <p>商业现实已经改变,所以做出长远的决定变得更有价值。除此以外,技术本身也在演化进步。Kafka, Storm, Trident, Samza, Spark, Flink, Parquet, Avro, Cloud providers等时髦的技术被工程师们和在商业上广泛使用.</p>    <p>因此, 现代基于Hadoop的 M/R通道 (以及Kafka,现代的二进制形式如Avro和数据仓库等。在本例中Amazon Redshift用作ad-hoc查询) 可能看起来像这样:</p>    <p style="text-align: center;"><img src="https://simg.open-open.com/show/4af3b41407a9af66f9052e2a0a20c8b6.png"></p>    <p>以上M/R通道看起来很不错,但是它仍然是传统上具有许多缺点的批处理。由于在新数据不断进入系统时,批处理过程通常需要花费很多时间来完成,它们主要是提供给终端用户的乏味的数据 罢了。</p>    <h2><strong>Lambda 架构</strong></h2>    <p>Nathan Marz 为通用,可扩展和容错性强的数据处理架构想出了一个术语 Lambda架构 。这个数据架构结合了批处理和流处理方法的优点来处理大批量数据。</p>    <p>我强烈推荐阅读 Nathan Marz  的书 ,这本书从源码角度对Lambda架构进行了完美的诠释。</p>    <h3><strong>层结构</strong></h3>    <p>从顶层来看,这是层的结构:</p>    <p style="text-align: center;"><img src="https://simg.open-open.com/show/d2764a41c9f703f07052a0d539c1df0f.png"></p>    <p>所有进入系统的数据被分配到了批处理层和高速层来处理。批处理层管理着主数据集(一个不可修改,只能新增的原始数据)和预计算批处理视图。服务层索引批处理视图,因此可以对它们进行低延时的临时查询。高速层只处理近期的数据。任何输入的查询结果都合并了批处理视图和实时视图的查询结果。</p>    <h3><strong>焦点</strong></h3>    <p>许多工程师认为 Lambda架构 就包含这些层和定义数据流程,但是 Nathan Marz 在他的书中 把焦点放在了其他重要的地方,如:</p>    <ul>     <li> <p>分布式思想</p> </li>     <li> <p>避免增量架构</p> </li>     <li> <p>关注数据的不可变性</p> </li>     <li> <p>创建再计算算法</p> </li>    </ul>    <h3><strong>数据的相关性</strong></h3>    <p><img src="https://simg.open-open.com/show/216797bf7a5d49e6144daa026821f6e2.png"></p>    <p>正如前面所提到的,任何输入的查询结果都会从批处理视图和实时视图的查询结果返回,因此这些视图需要被合并。在这里,需要注意的一点是,一个实时视图是上一个实时视图和新的数据增量的函数,因此一个增量算法可以在这里使用。批处理视图是所有数据的视图,因此再计算算法可以在这里使用。</p>    <h3><strong>均衡取舍</strong></h3>    <p>我们生活中的一切问题都存在权衡,Lambda架构( Lambda Architecture )不例外。 通常,我们需要解决几个主要的权衡:</p>    <ul>     <li> <p>完全重新计算vs.部分重新计算</p>      <ul>       <li> <p>某些情况下,可以考虑使用Bloom过滤器来避免完全重新计算</p> </li>      </ul> </li>     <li> <p>重算算法 vs. 增量算法</p>      <ul>       <li> <p>使用增量算法是个很大的诱惑,但参考指南,我们必须使用重算算法,即使它更难得到相同的结果</p> </li>      </ul> </li>     <li> <p>加法算法 vs. 近似算法</p>      <ul>       <li> <p>Lambda Architecture 能与加法算法很好地协同工作。 因此,在另一种情况下,我们需要考虑使用近似算法,例如,使用HyperLogLog处理 count-distinct 的问题等。</p> </li>      </ul> </li>    </ul>    <h3><strong>实现</strong></h3>    <p>有许多实现Lambda架构的方法,因为对于每个层的底层解决方案是非常独立的。每个层需要底层实现的特定功能,有助于做出更好的选择并避免过度决策:</p>    <ul>     <li> <p>批量层(Batch Layer):写一次,批量读取多次</p> </li>     <li> <p>服务层(Serving layer):随机读取,不支持随机写入,批量计算和批量写入</p> </li>     <li> <p>速度层(Speed layer):随机读取,随机写入;增量计算</p> </li>    </ul>    <p>例如,其中一个实现方案的构成(使用Kafka,Apache Hadoop,Voldemort,推ter Storm,Cassandra)可能如下图所示:</p>    <p><img src="https://simg.open-open.com/show/b188963c35889303f5687af5bd0d658c.png"></p>    <p><strong>Apache Spark</strong></p>    <p>Apache Spark 可以被认为是用于 Lambda 架构各层的集成解决方案。其中,Spark Core 包含了高层次的API和优化的支持通用图运算引擎,Spark SQL 用于SQL和结构化数据处理、Spark Streaming 可以解决高拓展、高吞吐、容错的实时流处理。在批处理中使用Spark可能小题大做,而且不是所有方案和数据集都适用。但除此之外,Spark算是对 Lambda Architecture 的合理的实现。</p>    <h3><strong>示例应用</strong></h3>    <p>下面通过一些路径创建一个示例应用,以展示 Lambda Architecture ,其主要目的是 提供 #morningatlohika tweets(一个由我 在 Lviv, Ukraine发起的 本地技术演讲 ,)这个hash标签的统计:包括之前到今天 <strong>这一刻</strong> 的所有时间。</p>    <p><strong>批处理视图(Batch View)</strong></p>    <p>简单地说,假定我们的主数据集包含自开始时间以来的所有更新。 此外,我们已经实现了一个批处理,可用于创建我们的业务目标所需的批处理视图,因此我们有一个预计算的批处理视图,其中包含所有与 #morningatlohika 相关的标签 统计信息:  </p>    <pre>  <code class="language-groovy">apache – 6  architecture – 12  aws – 3  java – 4  jeeconf – 7  lambda – 6  morningatlohika – 15  simpleworkflow – 14  spark – 5</code></pre>    <p>编号很容易记住,因为,为方便查看,我使用对应标签的英文单词的字母数目作为编号。</p>    <p><strong>实时视图</strong></p>    <p>假设应用程序启动后,同时有人发如下tweet:</p>    <pre>  <code class="language-groovy">“Cool blog post by @tmatyashovsky about  #lambda #architecture using #apache #spark at #morningatlohika”</code></pre>    <p>此时,正确假设应用程序启动后,同时有人发如下tweet:的实时视图应该包含如下的hash标签和统计数据(本例中都是1,因为每个hash标签只用了一次):</p>    <pre>  <code class="language-groovy">apache – 1  architecture – 1  lambda – 1  morningatlohika – 1  spark – 1</code></pre>    <p><strong>查询</strong></p>    <p>当终端用户查询出现是,为了给全部hash标签返回实时统计结果,我们只需要合并批处理视图和实时视图。所以,输出如下所示编码(hash标签的正确统计数据都加了1):</p>    <pre>  <code class="language-groovy">apache – 7  architecture – 13  aws – 3  java – 4  jeeconf – 7  lambda – 7  morningatlohika – 16  simpleworkflow – 14  spark – 6</code></pre>    <p><strong>场景</strong></p>    <p>示例中的场景可以简化为如下步骤:</p>    <ul>     <li> <p>用Apache Spark创建批处理视图(<em>.parquet</em>)</p> </li>     <li> <p>在Spark中缓存批处理视图</p> </li>     <li> <p>将流处理应用连接到推ter</p> </li>     <li> <p>实时监视包含<em>#morningatlohika 的tweets</em></p> </li>     <li> <p>构造增量实时视图</p> </li>     <li> <p>查询,即,即时合并批处理视图和实时视图</p> </li>    </ul>    <p><strong>技术细节</strong></p>    <p>此源代码是基于Apache Spark 1.6.x(注:再引入结构流之前)。 Spark Streaming架构是纯微型批处理架构: </p>    <p style="text-align: center;"><img src="https://simg.open-open.com/show/fd4b45c3fbe0a11681f64a2b997be99f.png" alt="含 Apache Spark 的 Lambda 架构" width="1068" height="584"></p>    <p>所以当我处理一个流媒体应用程序时,我使用<em>DStream</em>来连接使用<em>推terUtils</em>的推ter:</p>    <pre>  <code class="language-groovy">JavaDStream<Status> 推terStatuses = 推terUtils.createStream(javaStreamingContext,                                                                  create推terAuthorization(),                                                                   new String[]{推terFilterText});  </code></pre>    <p>在每个微批次中(使用可配置的批处理间隔),我正在对新tweets中的hashtags统计信息进行计算,并使用<em>updateStateByKey()</em>状态转换函数来更新实时视图的状态。简单地说,就是使用临时表将实时视图存储在存储器中。</p>    <p>查询服务反映了批处理的合并过程和通过代码表示的<em>DataFrame</em>实时视图:</p>    <pre>  <code class="language-groovy">DataFrame realTimeView = streamingService.getRealTimeView();  DataFrame batchView = servingService.getBatchView();  DataFrame mergedView = realTimeView.unionAll(batchView)                                     .groupBy(realTimeView.col(HASH_TAG.getValue()))                                     .sum(COUNT.getValue())                                     .orderBy(HASH_TAG.getValue());     List<Row> merged = mergedView.collectAsList();     return merged.stream()     .map(row -> new HashTagCount(row.getString(0), row.getLong(1)))     .collect(Collectors.toList());</code></pre>    <h3><strong>成果</strong></h3>    <p>在简化的方案下,文章开头提到的基于Hadoop 的M/R 管道可以通过Apache Spark进行如下优化:</p>    <p><img src="https://simg.open-open.com/show/cc7caf651757759dcd4ad2dc88ab0a61.png"></p>    <h2><strong>本章结语</strong></h2>    <p>正如上文提到的 Lambda架构 有优点和缺点,所以结果就是有支持者和反对者。一些人会说批处理视图和实时视图有很多重复的逻辑,因为最终他们需要从查询的角度创建出可以合并的视图。因此,他们创建了 Kappa 架构——一个 Lambda 架构的简化方案。 Kappa架构的系统去掉了批处理系统,取而代之的是数据从流处理系统中快速通过:</p>    <p style="text-align: center;"><img src="https://simg.open-open.com/show/933bc12facba60500804f58107453a6e.png"></p>    <p>即便在此场景中,Spark也能发挥作用,比如,参与流处理系统:</p>    <p style="text-align: center;"><img src="https://simg.open-open.com/show/7b81c556cb86b660f95f9dbf029f39c0.png"></p>    <p> </p>    <p>来自:https://www.oschina.net/translate/lambda-architecture-with-apache-spark</p>    <p> </p>