谈谈分布式计算的算子层

jopen 10年前

本文是我对分布式计算的算子这层的一些认识和想法。因为最近自己的开发任务也是这方面相关的,公司内部有自研的类流式计算框架需要做一层算子层。我主要分析的是流式系统上实现算子这一点入手,对比现有计算框架和业界正在开展的项目,分析分析这件事的表面和背后深层的含义,以及可想象空间


趋势

Yahoo! 的Pig on Storm项目,让Pig-latin能够执行在Storm这种流式引擎上,最终使得Pig-latin能够混用于流式计算和批量计算场景下。应该说,无论是Spark,Summingbird,还是Pig,都在尝试做同一件事情:借助自己的DSL或原语在流式和批量两套引擎上表达(近)实时和离线数据处理能力


Spark本身依赖RDD,实现了Spark Streaming这种小批流计算,其DStream就是RDD,所以在Spark上写批量作业和流式作业API自然是统一的。


Summingbird在API层面统一了Storm上和Hadoop上的作业,对于Hadoop上任务的编写借助的是Cascading,属性上看更多的是一种适配的角色,虽然Summingbird也称为Lambda Architecture的一种解决方案。


总结:表面上看,DSL需要支持不同的计算引擎,以达到算子层面的混用,这是趋势。那么实现上的难度在哪呢?


挑战

在流式系统上实现pig-latin这种本身就诞生于批量计算场景里的DSL,对某些关系型操作会有语义层面的不清晰性,具体可以看Pig on Storm初步讨论。对于filter,foreach,union,甚至稍微复杂点的需要借助state的distinct,limit,在批量和流式场景下都是没有歧义的,实现起来不会有太大的区别或难度。但是像两流做sql语义里的join,或者多流做pig语义里的group,cross的时候,流式上的实现就不一致了,而且这个原语的定义也不同了


在流式系统上实现DSL或者一套FlumeJava,关键在能把UDAF给实现了。而要实现UDAF,就涉及到了跨批的事情。这件事情本质上需要引擎的支持,比如Trident有SpoutCoordinator作流控,还具备一定的事务性,那么在你要做跨批之间的UDAF的时候呢,可以借助 Trident的State,也就是辅助存储,调用persistAggregate这样的操作来完成。如果引擎不支持的话,比如原生Storm的接口,就没办法做流式DSL。


那么像Spark那样又不同,因为Spark本身不是流式系统,他的Spark Streaming上可以实现DSL,甚至可以和Spark SQL结合起来跑Streaming形式的SQL,原因是Spark是批量计算框架,所以他可以做类流式DSL。


总结:实现上看,流式系统上实现DSL难点在UDAF,本质上是跨批计算。那么流式上的跨批可以抽象为一种怎样的模式呢?


增量计算

增量计算,理论上可以包含批量计算,流式计算,也包括了迭代计算。怎么理解呢。增量计算可以表达为 newValue = function ( currentValue, oldValue ),而newValue被保存为oldValue与之后新来的currentValue继续产生关系,而这个不断传承下去的oldValue就是增量计算结果。

增量计算和前面提到的流式系统上实现算子有什么关系?这个增量的模型就是跨批计算的一种形式。function可以理解为一个算子,currentValue可以理解为本批计算结果,oldValue可以理解为UDAF的计算结果。

这个模型只有流式系统能实现吗?不是的,批量计算框架也可以做,大不了newValue每次都落盘嘛。如果Hadoop MR来做这件事情,其实是把每一次MR的数据当作一批,跨批的结果是额外保存的。如果RDD来做这件事情,那就不同了,上述这种模型很适合RDD来做,因为迭代计算可以看成是增量计算的一种,而RDD很擅长构建DAG来完成迭代计算,只是每次计算出来的都是immutable的新RDD。

流式系统怎么实现这种增量计算模型呢?这就是我们组之前老大和同事智慧的结晶了,具体不方便说。其实实现它不是难点,难点是计算框架内需要对oldValue进行容错。RDD不用担心容错,因为有lineage来记录,大不了可以重算,而且是可以并行的。Storm和Trident也不用担心容错,因为他把fail逻辑都交给用户了!而我们组目前的增量计算引擎完成了这件事情,并且一直在checkpoint的优化上做着努力。

总结:计算模型上,在流式系统上实现增量计算引擎,是实现丰富算子层,做流式SQL的一个必要条件。流式上实现的增量计算模型,有什么本质缺陷吗?

深入RDD

之前在杭州Spark meetup,分享Spark SQL的时候,我提到过Spark RDD最重要的两层意义:原语的丰富和数据表示能力前者使得Spark编程很easy,后者使得计算结果做到了reuse,适应了MR模型、迭代计算模型、BSP模型。基于这两点,Spark Core上可以轻松衍生出SQL产品、机器学习产品、图计算产品、流计算产品。

反观流式系统,比如Storm,原语要简单丰富易用不是难事,问题是你数据能reuse吗?!reuse有什么优点?拿RDD来说,节省内存空间以及并发的计算能力。RDD在设计之初就是immutable的,而且在计算内部消化掉了MapReduce,而暴露出丰富的Transformation和 Action。在论文中,RDD与DSM(Distributed Shared Memory)也进行了多维度的对比。应该说,Matei在设计RDD之前的参与Hadoop MapReduce源码的开发经验,加上当时其他系统内DSM的差异设计,以及Google FlumeJava,微软DryadLINQ在API层面的理念,最终揉合成了RDD这套东西。现在只有Spark现在实现了它。

最近我在增量计算引擎上实现的算子层,也是参考了FlumeJava,Trident,RDD设计出来的,还在测试中。就像我开头说的,Pig on Storm这件事情,换引擎是表面。背后意义是算子层面的混用,最终的想象空间是一层统一的DAG,上面承接Pig、Hive、SQL等DSL,下面对接不同的计算系统。实现起来是不困难的,困难点可能不是技术问题。


总结:RDD两个致命优点,easy to use和数据的reuse,是其他系统难达到的,特别是第二点,也是RDD的精髓所在。


对比Storm

marz做了Storm,ElephentDB之后,按照他的理解在how to beat CAP里提出了一种解决方案。在他提出的lambda achitecture里,Storm的定位在流式处理,而做类似ad-hoc的service layer是HBase。如果换做是我们目前的增量计算框架的愿景的话,我认为,流式和ad-hoc这层有望被增量计算引擎统一。为什么?


Query = Function(All Data)


Data静,Query动,是ad-hoc计算;Data动,Query静,是流式计算;Data动,Query动,是持续计算。 Storm处于第二者,增量计算框架可以做到第三者。Storm的拓扑提交是个严重问题,等Nimbus拉起bolt和spout的时候,黄花菜都凉了。它的确适合流式计算,为什么呢,因为流式的本质就是消息。Storm抽象的那层拓扑,bolt之间的消息通道,ack机制都很不错,这层抽象满足了流式计算,但是work这层以及调度这层远远不满足Query不断变化而仍需要流式计算的场景。我们现在做的框架将来会满足这件事情,从此统一了流式、批量、迭代,超越现在的流式计算,不仅仅是StreamSQL,Stream上的DSL都是可以通过算子层来实现的。


总结:Data动,Query动的场景如何统一解决?增量计算想象空间巨大,算子层重要性突显。


全文完 :)

来自:http://blog.csdn.net/pelick/article/details/39577785