Storm企业级应用:实战、运维和调优——1.1 什么是实时流计算

hrd888888 9年前

来自: http://www.adintellig.com/storm-in-action-1-1/

《Storm企业级应用:实战、运维和调优》原版授权,未经允许不得转载!

1.1    什么是实时流计算

所谓实时流计算就是近几年由于数据得到广泛应用之后,数据持久性建模不满足现状下,急需数据流的瞬时建模或者计算处理。这种实时计算的应用的实例有金融服务、网络监控、电信数据管理、Web应用、生产制造、传感检测等等。在这种数据流模型中,单独的数据单元可能是相关的元组(Tuple),例如网络测量、呼叫记录、网页访问等产生的数据。但是,这些数据以大量、快速、时变(可能是不可预知)的数据流持续到达,由此产生了一些基础性的新的研究问题——实时计算,实时计算的一个重要方向就是实时流计算。

1.1.1   实时流计算背景

数据的价值随着时间的流逝而降低,所以事件出现后必须尽快地对它们进行处理,最好数据出现时便立刻对其进行处理,发生一个事件进行一次处理,而不是缓存起来成一批处理。例如商用搜索引擎,像Google、Bing和Yahoo !等,通常在用户查询响应中提供结构化的Web结果,同时也插入基于流量的点击付费模式的文本广告。为了在页面上最佳位置展现最相关的广告,通过一些算法来动态估算给定上下文中一个广告被点击的可能性。上下文可能包括用户偏好、地理位置、历史查询、历史点击等信息。一个主搜索引擎可能每秒钟处理成千上万次查询,每个页面都可能会包含多个广告。为了及时处理用户反馈,需要一个低延迟、可扩展、高可靠的处理引擎。

对于这些实时性要求很高的应用,若把持续到达的数据简单地放到传统数据库管理系统(DBMS)中,并在其中进行操作,是不太切实的。传统的DBMS并不是为快速连续的存放单独的数据单元而设计的,而且也并不支持“持续处理”,而“持续处理”是数据流应用的典型特征。另外,现在人们都认识到,“近似性”和“自适应性”是对数据流进行快速查询和其他处理(如数据分析和数据采集)的关键要素,而传统DBMS的主要目标恰恰与之相反:通过稳定的查询设计,得到精确的答案。

另外一些方案是采用MapReduce来进行实时数据流处理。但是,尽管MapReduce做了实时性改进,仍然很难稳定地满足应用需求。这是因为Hadoop MapReduce框架为批处理做了高度优化,系统典型地通过调度批量任务来操作静态数据,任务不是常驻服务,数据也不是实时流入;而数据流计算的典型范式之一是不确定数据速率的事件流流入系统,系统处理能力必须与事件流量匹配。

1.1.2   实时计算应用场景

互联网领域的实时流计算一般都是针对海量数据进行的,除了像非实时计算的需求(如计算结果准确)以外,实时计算最重要的一个需求是能够实时响应计算结果,一般要求为秒级。个人理解,互联网行业的实时计算可以分为以下两种应用场景。

1) 数据源是实时的不间断的,要求对用户的响应时间也是实时的。

主要用于互联网流式数据处理。所谓流式数据是指将数据看作是数据流的形式来处理。数据流则是在时间分布和数量上无限的一系列数据记录的集合体;数据记录是数据流的最小组成单元。举个例子,对于大型网站,活跃的流式数据非常常见,这些数据包括网站的访问PV/UV、用户访问了什么内容,搜索了什么内容等。实时的数据计算和分析可以动态实时地刷新用户访问数据,展示网站实时流量的变化情况,分析每天各小时的流量和用户分布情况,这对于大型网站来说具有重要的实际意义。

2) 数据量大且无法或没必要预算,但要求对用户的响应时间是实时的。

主要用于特定场合下的数据分析处理。当数据量很大,同时发现无法穷举所有可能条件的查询组合或者大量穷举出来的条件组合无用的时候,实时计算就可以发挥作用,将计算过程推迟到查询阶段进行,但需要为用户提供实时响应。

1.1.3   实时计算处理流程

互联网上海量数据(一般为日志流)的实时计算过程可以被划分为以下三个阶段:数据的产生与收集阶段、传输与分析处理阶段、存储对对外提供服务阶段。下面分别进行简单的介绍如图1-1所示。

图1-1    实时计算处理流程

(1)           数据实时采集

需求:功能上保证可以完整的收集到所有日志数据,为实时应用提供实时数据;响应时间上要保证实时性、低延迟在1秒左右;配置简单,部署容易;系统稳定可靠等。

目前,互联网企业的海量数据采集工具,有非死book开源的 Scribe 、LinkedIn开源的 Kafka 、Cloudera开源的 Flume ,淘宝开源的 TimeTunnel 、Hadoop的 Chukwa 等,均可以满足每秒数百MB的日志数据采集和传输需求。

(2)           数据实时计算

传统的数据操作,首先将数据采集并存储在DBMS中,然后通过query和DBMS进行交互,得到用户想要的答案。整个过程中,用户是主动的,而DBMS系统是被动的,过程操作如图1-2所示。

图1-2    传统的数据操作流程

但是,对于现在大量存在的实时数据,比如股票交易的数据,这类数据实时性强,数据量大,没有止境,传统的架构并不合适。 流计算 就是专门针对这种数据类型准备的。在流数据不断变化的运动过程中实时地进行分析,捕捉到可能对用户有用的信息,并把结果发送出去。整个过程中,数据分析处理系统是主动的,而用户却处于被动接收的状态,处理流程如图1-3所示。

图1-3    流计算处理过程

需求:适应流式数据、不间断查询;系统稳定可靠、可扩展性好、可维护性好等。

有关计算的一些注意点:分布式计算,并行计算(节点间的并行、节点内的并行),热点数据的缓存策略,服务端计算。

(3)            实时查询服务

全内存:直接提供数据读取服务,定期dump到磁盘或数据库进行持久化。

半内存:使用Redis、Memcache、MongoDB、BerkeleyDB等内存数据库提供数据实时查询服务,由这些系统进行持久化操作。

全磁盘:使用HBase等以分布式文件系统(HDFS)为基础的NoSQL数据库,对于KeyValue内存引擎,关键是设计好Key的分布。

1.1.4   实时计算框架

最近这几年随着实时计算的流行,相继出现了一下实时计算的框架,以下做了部分总结。

1.       IBM的StreamBase

StreamBase[1]是IBM开发的一款商业流式计算系统,在金融行业和政府部门使用,其本身是商业应用软件,但提供了Develop Edition。相对于付费使用的Enterprise Edition,前者的功能更少,但这并不妨碍我们从外部使用和API接口来对StreamBase本身进行分析。

StreamBase使用Java开发,IDE是基于Eclipse进行二次开发,功能非常强大。StreamBase也提供了相当多的 Operator、Functor以及其他组件来帮助构建应用程序。用户只需要通过IDE拖拉控件,然后关联一下,设置好传输的Schema并且设置一下控件计算过程,就可以编译出一个高效处理的流式应用程序了。同时,StreamBase还提供了类SQL语言来描述计算过程。StreamBase的架构如图1-4所示。

图1-4    StreamBase架构图

StreamBase Server是节点上启动的管理进程,它负责管理节点上Container的实例,每个Container通过Adapter获得输入,交给应用逻辑进行计算,然后通过Adapter进行输出。各个Container相互连接,形成一个计算流图。

Adapter负责与异构输入或输出交互,源或目的地可能包括CSV文件、JDBC、JMS、Simulation(StreamBase提供的流产生模拟器)或用户定制。

每个StreamBase Server上面都会存在一个System Container,主要是产生系统监控信息的流式数据。

HA Container用于容错恢复,可以看出它实际包含两个部分:Heartbeat和HA Events,其中Heartbeat也是Tuple在Container之间传输。在HA方案下,HA Container监控Primary Server的活动情况,然后将这些信息转换成为HA Events交给StreamBase Monitor来处理。

Monitor就是从System Container和HA Container中获取数据并且进行处理。StreamBase认为HA 问题应该通过CEP方式处理,也就是说如果哪个部件出现问题,就肯定会反映在System Container和HA Container的输出流上面,然后 Monitor通过复杂事件处理这些Tuples的话就能够检测到机器故障等问题,并做出相应处理。

2.       Yahoo的S4

Yahoo! S4(Simple Scalable Streaming System)是一个通用的、分布式的、可扩展的、分区容错的、可插拔的流式系统[2]。基于S4框架,开发者可以容易地开发面向持续流数据处理的应用。S4的最新版本是v0.6.0,是Apache孵化项目,其设计特点有以下几个方面。

■   Actor计算模型:为了能在普通机型构成的集群上进行分布式处理,并且集群内部不使用共享内存,S4架构采用了Actor模式,这种模式提供了封装和地址透明语义,因此在允许应用大规模并发的同时,也提供了简单的编程接口。S4系统通过处理单元(Processing Elements,PEs)进行计算,消息在处理单元间以数据事件的形式传送,PE消费事件,发出一个或多个可能被其他PE处理的事件,或者直接发布结果。每个PE的状态对于其他PE不可见,PE之间唯一的交互模式就是发出事件和消费事件。

■   对等集群架构:S4采用对等架构,集群中的所有处理节点都是等同的,没有中心控制节点,这使得集群的扩展性很好,处理节点的总数理论上无上限;同时,S4将没有单点容错的问题。

■   可插拔体系架构:S4系统使用Java语言开发,采用了极富层次的模块化编程,每个通用功能点都尽量抽象出来作为通用模块,而且尽可能地让各模块实现可定制化。

■   支持部分容错:基于ZooKeeper服务的集群管理层将会自动路由事件从失效节点到其他节点。除非显式保存到持久性存储,否则节点故障时,节点上处理事件的状态会丢失。

S4的重要应用场景就是点击通过率(CTR)预估这类应用。CTR是广告点击数除以展现数得到的比率,当拥有了足够历史的展现和点击数据后,CTR是用户点击广告可能性的一个很好的估算,精确的来源点击对于个性化和搜索排名来说都价值无限。据S4的开发者称,在线流量上的实验显示基于S4系统的新CTR计算框架可以在不影响收入的前提下将CTR值提高3%,这主要是通过快速检测低质量的广告并把它们过滤出去而获得的收益。S4系统提供的低延迟处理能够使得商务广告部门获益,但是潜在的风险也不能忽视,那就是事件流的速率快到一定程度后,S4可能无法处理,会导致事件的丢失。如图1-5所示。

图1-5    S4在流量压力测试下的事件丢失情况

3.       推ter的Storm

推ter的Storm[3][4]:Storm是一个分布式的、容错的实时计算系统。Storm用途:可用于处理消息和更新数据库(流处理),在数据流上进行持续查询,并以流的形式返回结果到客户端(持续计算),并行化一个类似实时查询的热点查询(分布式的RPC)。

Storm为分布式实时计算提供了一组通用原语,可被用于“流处理”之中,实时处理消息并更新数据库。这是管理队列及工作者集群的另一种方式。 Storm也可被用于“连续计算”(continuous computation),对数据流做连续查询,在计算时就将结果以流的形式输出给用户。它还可被用于“分布式RPC”,以并行的方式运行昂贵的运算。

Storm的主要特点如下:

■   简单的编程模型。类似于MapReduce降低了并行批处理复杂性,Storm降低了进行实时处理的复杂性。

■   可以使用各种编程语言。可以在Storm之上使用各种编程语言。默认支持Clojure、Java、Ruby和Python。要增加对其他语言的支持,只需实现一个简单的Storm通信协议即可。

■   容错性。Storm会管理工作进程和节点的故障。

■   水平扩展。计算是在多个线程、进程和服务器之间并行进行的。

■   可靠的消息处理。Storm保证每个消息至少能得到一次完整处理。任务失败时,它会负责从消息源重试消息。

■   快速。系统的设计保证了消息能得到快速的处理,使用ØMQ作为其底层消息队列。

■   本地模式。Storm有一个“本地模式”,可以在处理过程中完全模拟Storm集群。这让你可以快速进行开发和单元测试。

4.       推ter的Rainbird

Rainbird是一款分布式实时统计系统,可以用于实时数据的统计:1统计网站中每一个页面,域名的点击次数;2内部系统的运行监控(统计被监控服务器的运行状态),3记录最大值和最小值。

Rainbird构建在Cassandra之上,使用Scala编写,依赖于ZooKeeper、Scribe和Thrift。每秒可以写入10万个事件,而且都带有层次结构,或者进行各种查询,延迟小于100ms。目前推ter已经在Promoted Tweets、微博中的链接、短地址、每个用户的微博交互等生产环境使用了Rainbird。其主要组件的功能描述如下。

■   ZooKeeper:Hadoop子项目中的一款分布式协调系统,用于控制分布式系统中各个组件中的一致性。

■   Cassandra:NoSQL中一款非常出色的产品,集合了Dynamo和BigTable特性的分布式存储系统,用于存储需要进行统计的数据,统计数据,并且提供客户端进行统计数据的查询。(需要使用分布式Counter补丁CASSANDRA-1072)

■   Scribe:非死book开源的一款分布式日志收集系统,用于在系统中将各个需要统计的数据源收集到Cassandra中。

■   Thrift:非死book开源的一款跨语言C/S网络通信框架,开发人员基于这个框架可以轻易地开发C/S应用。

5.       非死book 的Puma

Puma是非死book的数据流处理系统,早期的处理系统如图1-6所示,即二代Puma。PTail将数据以流的方式传递给Puma2,Puma2每秒需要处理百万级的消息,处理多为Aggregation方式的操作,遵循时间序列,涉及的复杂Aggregation操作诸如独立访次、最频繁事件等等。

对于每条消息,Puma2发送“Increment”操作到HBase。考虑到自动负载均衡、自动容错和写入吞吐等因素,Puma选择HBase而不是Mysql作为其存储引擎。Puma2的服务器都是对等的,也即同时可能有多个Puma2服务器向HBase中修改同一行数据。因此,非死book为HBase增加了新的功能,支持一条Increment操作修改同行数据的多个列。

图1-6    Puma2系统数据处理通路

Puma2的架构非常简单并且易于维护,其涉及的状态仅仅是PTail的Checkpoint ,即上游数据位置,周期性地存储在HBase中。由于是对称结构,集群扩容和机器故障时的处理都非常方便。不过,Puma2的缺点也很突出,首先,HBase的 Increment操作是非常昂贵的,因为它涉及读和写,而HBase的随机读效率是比较差;另外,复杂Aggregation操作也不好支持,需要在HBase上写很多用户代码;再者,Puma2在故障时会产生少量重复数据,因为HBase的Increment和PTail的Checkpoint并不是一个原子操作。

但值得一提的是Puma并没有开源出来,读者可以了解和借鉴其实现原理。

6.       阿里的JStorm

JStorm是一个Alibaba开源的分布式实时计算引擎,可以认为是推ter Storm的Java版本,用户按照指定的接口实现一个任务,然后将这个任务递交给JStorm系统,JStorm会启动后台服务进程7 * 24小时运行,一旦某个Worker 发生故障,调度器立即分配一个新的Worker替换这个失效的Worker。

JStorm处理数据的方式是基于消息的流水线处理, 因此特别适合无状态计算,也就是计算单元的依赖的数据全部在接受的消息中可以找到, 并且最好一个数据流不依赖另外一个数据流。因此,JStorm适用于下面的场景中:

■   日志分析。从日志中分析出特定的数据,并将结果存入外部存储器如数据库。

■   管道系统。 将一个数据从一个系统传输到另外一个系统,比如将数据库同步到Hadoop。

■   消息转化器。将接受到的消息按照某种格式进行转化,存储到另外一个系统如消息中间件。

■   统计分析器。从日志或消息中,提炼出某个字段,然后做COUNT或SUM计算,最后将统计值存入外部存储器。

但是,JStorm的活跃度并不高,截止到本章书写的时间,整个JStorm项目共提交过36次,并且只有1个Committer,相比推ter Storm,不管是活跃度、认可度上还不是一个数量级的产品。

7.       其他实时计算系统

(1)              HStreaming

HStreaming[5]是建立在Hadoop上的可扩展的、可持续的数据分析系统。它可以分析、可视化并处理大量连续数据比如一个金融交易系统实时展示数据图。HStreaming由Jana Uhlig与Volkmar Uhlig联合创立,该公司没有提供相关产品的开源版本,从官网信息来看只提供相关的解决方案。

HStreaming公司尝试为Hadoop环境添加一个实时的组件,当数据提交到系统,在存储到磁盘之前就会进行数据的处理,类似开源的Storm和Kafka。目前HStreaming已经建立了一个完整的系统,该系统能够利用实时的引擎来处理视频、服务器、传感器以及其他机器上生成的数据流。而且完全兼容Hadoop作为一个归档和批量处理系统。

(2)              Esper

Esper[6]是EsperTech公司使用Java开发的事件流处理(ESP:Event Stream Processing)和复杂事件处理(CEP:Complex Event Processing)引擎。CEP是一种实时事件处理并从大量事件数据流中挖掘复杂模式的技术。ESP是一种从大量事件数据流中过滤,分析有意义的事件,并能够实时取得这些有意义的信息的技术。该引擎可应用于网络入侵探测,SLA监测,RFID读取,航空运输调控,金融方面(风险管理、欺诈探测)等领域。Esper可以用在股票系统、风险监控系统等等要求实时性比较高的系统中。

(3)    Borealis

Brandeis University、Brown University和MIT合作开发的一个分布式流式系统,由之前的流式系统Aurora、Medusa演化而来,学术研究的一个产品,08年已经停止维护。

Borealis具有丰富的论文、完整的用户/开发者文档,系统是C++实现的,运行于x86-based Linux平台。系统是开源的,同时使用了较多的第三方开源组件,包括用于查询语言翻译的ANTLR、C++的网络编程框架库NMSTL等。

Borealis系统的流式模型和其他流式系统基本一致:接受多元的数据流和输出,为了容错,采用确定性计算,对于容错性要求高的系统,会对输入流使用算子进行定序。

8.       框架对比

实时数据流计算是近年来分布式、并行计算领域研究和实践的重点,无论是工业界还是学术界,诞生了多个具有代表性的数据流计算系统,用于解决实际生产问题和进行学术研究。不同的系统满足不同应用的需求,系统并无好坏之分,关键在于服务的对象是谁。图8-5从各个方面比较了典型的三个数据流计算系统Puma、Storm和S4,因为StreamBase是厂商发行商用版本,HStreaming只提供解决方案,而JStorm和Storm非常相似,所以这几种产品并没有罗列在图1-7中。

图1-7    Puma、Storm和S4三种数据流计算系统对比

图1-7从开发语言、高可用机制、支持精确恢复、主从架构、资源利用率、恢复时间、支持状态持久化及支持去重等几个方面对这三种系统进行了对比。可以看到,为了高效开发,两个系统使用Java,另一种系统使用函数式编程语言Clojure;高可用方案,有两个系统使用Passive Standby方式,系统恢复时间可控,但系统复杂度增加,资源使用率也较低,因为需要一些机器用来当备机;而Storm选择了更简单可行的上游回放方式,资源使用率高,就是恢复时间可能稍长些;Puma和S4都支持状态持久化,但S4目前不支持数据去重,未来可能会实现;三个系统都做不到精确恢复,即恢复后的执行结果和无故障发生时保持一致,因为即使是Passive Standby方式,也只是定期Checkpoint,并没有跟踪每条消息的执行。商用的StreamBase支持精确恢复,这主要应用于金融领域。

[1]有关该实现的详细资料,请参见官网http://www.streambase.com

[2]有关该实现的详细资料,请参见http://s4.io/。

[3]有关该实现的详细资料,请参见http://www.slideshare.net/kevinweil/rainbird-realtime-analytics-at-推ter-strata-2011。

[4]官方指南请参见https://Storm.canonical.com/Tutorial。

[5]有关该实现的详细资料,请参见http://www.hstreaming.com/technology/hstreaming/。

[6]官方网站请参见http://www.espertech.com/。