推ter Storm 实时数据处理框架分析总结

fmms 13年前
     Storm是推ter开源的一个类似于Hadoop的实时数据处理框架(原来是由BackType开发,后BackType被推ter收购,将Storm作为推ter的实时数据分析)。实时数据处理的应用场景很广泛,如上篇文章介绍S4时所说的个性化搜索广告的会话特征分析。而Yahoo当初创建S4项目的直接业务需求就是为了在搜索引擎的‘cost-per-click’广告中,能根据当前情景上下文(用户偏好,地理位置,已发生的查询和点击等)来估计用户点击的可能性并实时做出调整。    <p>  这种高可拓展性,能处理高频数据和大规模数据的实时流计算解决方案将被应用于实时搜索,高频交易和社交网络上。而流计算并不是最近的热点,金融机构的交易系统正是一个典型的流计算处理系统,它对系统的实时性和一致性有很高要求。</p>    <p>  推ter列举了storm的三大作用领域:</p>    <p>  1) 信息流处理(Stream Processing)</p>    <p>        Storm可以用来实时处理新数据和更新数据库,兼具容错性和可扩展性。</p>    <p>  2) 连续计算(Continuous Computation)</p>    <p>        Storm可以进行连续查询并把结果即时反馈给客户,比如将推ter上的热门话题发送到客户端。</p>    <p>  3) 分布式远程过程调用(Distributed RPC)</p>    <p>        Storm可以用来并行处理密集查询,Storm的拓扑结构(后文会介绍)是一个等待调用信息的分布函数,当它收到一条调用信息后,会对查</p>    <p>        询进行计算,并返回查询结果。</p>    <p>  <strong>Storm的设计思想</strong></p>    <p>  在Storm中也有对于流stream的抽象,流是一个不间断的无界的连续tuple,注意Storm在建模事件流时,把流中的事件抽象为tuple即元组,后面会解释storm中如何使用tuple。</p>    <p><a href="https://simg.open-open.com/show/e19870e86ba4bf7eb5c7ad5d7ce342fc.jpg"><img style="border-bottom:0px;border-left:0px;display:block;float:none;margin-left:auto;border-top:0px;margin-right:auto;border-right:0px;" title="clip_image002" border="0" alt="推ter Storm 实时数据处理框架分析总结" src="https://simg.open-open.com/show/8854c313a29cd68dd395cd4d1197ecd1.jpg" width="586" height="174" /></a></p>    <p>  Storm认为每个stream都有一个stream源,也就是原始元组的源头,所以它将这个源头抽象为spout,spout可能是连接推ter api并不断发出tweets,也可能是从某个队列中不断读取队列元素并装配为tuple发射。</p>    <p>  有了源头即spout也就是有了stream,那么该如何处理stream内的tuple呢,同样的思想推ter将流的中间状态转换抽象为Bolt,bolt可以消费任意数量的输入流,只要将流方向导向该bolt,同时它也可以发送新的流给其他bolt使用,这样一来,只要打开特定的spout(管口)再将spout中流出的tuple导向特定的bolt,又bolt对导入的流做处理后再导向其他bolt或者目的地。</p>    <p>我们可以认为spout就是一个一个的水龙头,并且每个水龙头里流出的水是不同的,我们想拿到哪种水就拧开哪个水龙头,然后使用管道将水龙头的水导向到一个水处理器(bolt),水处理器处理后再使用管道导向另一个处理器或者存入容器中。</p>    <p><a href="https://simg.open-open.com/show/42d04ea07f16da9f5207c8b1528719e1.jpg"><img style="border-bottom:0px;border-left:0px;display:block;float:none;margin-left:auto;border-top:0px;margin-right:auto;border-right:0px;" title="clip_image004" border="0" alt="推ter Storm 实时数据处理框架分析总结" src="https://simg.open-open.com/show/070581cd417677a694db2a7fa8f96a01.jpg" width="603" height="186" /></a></p>    <p>  为了增大水处理效率,我们很自然就想到在同个水源处接上多个水龙头并使用多个水处理器,这样就可以提高效率。没错Storm就是这样设计的,看到下图我们就明白了。</p>    <p><a href="https://simg.open-open.com/show/b11f3f96d91315cf892e6d7a927d4bf4.png"><img style="border-bottom:0px;border-left:0px;display:block;float:none;margin-left:auto;border-top:0px;margin-right:auto;border-right:0px;" title="clip_image005" border="0" alt="推ter Storm 实时数据处理框架分析总结" src="https://simg.open-open.com/show/a251e6e46f713bfb07660b5430a61f51.png" width="367" height="270" /></a></p>    <p>  对应上文的介绍,我们可以很容易的理解这幅图,这是一张有向无环图,Storm将这个图抽象为Topology即拓扑(的确,拓扑结构是有向无环的),拓扑是storm中最高层次的一个抽象概念,它可以被提交到storm集群执行,一个拓扑就是一个流转换图,图中每个节点是一个spout或者bolt,图中的边表示bolt订阅了哪些流,当spout或者bolt发送元组到流时,它就发送元组到每个订阅了该流的bolt(这就意味着不需要我们手工拉管道,只要预先订阅,spout就会将流发到适当bolt上)。</p>    <p>  插个位置说下storm的topology实现,为了做实时计算,我们需要设计一个拓扑图,并实现其中的Bolt处理细节,Storm中拓扑定义仅仅是一些Thrift结构体(请google一下Thrift),这样一来我们就可以使用其他语言来创建和提交拓扑。</p>    <p>上篇文章说过S4中PE间的事件传递是以一种(K,A)的元素传递,Storm则将流中元素抽象为tuple,一个tuple就是一个值列表value list,list中的每个value都有一个name,并且该value可以是基本类型,字符类型,字节数组等,当然也可以是其他可序列化的类型。</p>    <p>拓扑的每个节点都要说明它所发射出的元组的字段的name,其他节点只需要订阅该name就可以接收处理。</p>    <p>  说到这里,Storm的核心实时处理思想就说完了,不过既然Storm要能发挥实时处理的能力就必须要由良好的架构设计和部署设计,接下来是Storm的集群部署设计,这里Storm的官方介绍得很清楚了,我就直接copy过来,再做一点分析。</p>    <p>  Storm集群表面类似Hadoop集群。但在Hadoop上你运行的是”MapReduce jobs”,在Storm上你运行的是”topologies”。”Jobs”和”topologies”是大不同的,一个关键不同是一个MapReduce的Job最终会结束,而一个topology永远处理消息(或直到你kill它)。</p>    <p>  Storm集群有两种节点:控制(master)节点和工作者(worker)节点。</p>    <p>控制节点运行一个称之为”nimbus”的后台程序,它类似于Haddop的”JobTracker”。Nimbus负责在集群范围内分发代码、为worker分配任务和故障监测。</p>    <p>  每个工作者节点运行一个称之”Supervisor”的后台程序。Supervisor监听分配给它所在机器的工作,基于Nimbus分配给它的事情来决定启动或停止工作者进程。每个工作者进程执行一个topology的子集(也就是一个子拓扑结构);一个运行中的topology由许多跨多个机器的工作者进程组成。</p>    <p><a href="https://simg.open-open.com/show/a4facd7d3a38635767653c2c6284b9f8.png"><span style="color:#000000;"><img style="border-bottom:0px;border-left:0px;display:block;float:none;margin-left:auto;border-top:0px;margin-right:auto;border-right:0px;" title="clip_image007" border="0" alt="推ter Storm 实时数据处理框架分析总结" src="https://simg.open-open.com/show/590cc97bec219667d66f23fda86c5881.png" width="389" height="317" /></span></a></p>    <p>  一个Zookeeper集群负责Nimbus和多个Supervisor之间的所有协调工作(一个完整的拓扑可能被分为多个子拓扑并由多个supervisor完成)。</p>    <p>此外,Nimbus后台程序和Supervisor后台程序都是快速失败(fail-fast)和无状态的;所有状态维持在Zookeeper或本地磁盘。这意味着你可以kill -9杀掉nimbus进程和supervisor进程,然后重启,它们将恢复状态并继续工作,就像什么也没发生。这种设计使storm极其稳定。这种设计中Master并没有直接和worker通信,而是借助一个中介Zookeeper,这样一来可以分离master和worker的依赖,将状态信息存放在zookeeper集群内以快速回复任何失败的一方。</p>    <p>  网络上 有一篇文章说道Storm这种topology结构的使用妙处,即可以进行<strong>stream grouping </strong>从而实现多种处理需求,原文地址:<a href="/misc/goto?guid=4959517947922309632"><span style="color:#3d81ee;">http://blog.sina.com.cn/s/blog_406d9bb00100ui5p.html</span></a> 下面是stream grouping的种类。</p>    <table class="ke-zeroborder" border="0">     <tbody>      <tr>       <td> <p><strong>stream grouping</strong><strong>分类</strong></p> <p>1. Shuffle Grouping: 随机分组, 随机派发stream里面的tuple, 保证每个bolt接收到的tuple数目相同.<br /> 2. Fields Grouping:按字段分组, 比如按userid来分组, 具有同样userid的tuple会被分到相同的Bolts, 而不同的userid则会被分配到不同的Bolts.<br /> 3. All Grouping: 广播发送, 对于每一个tuple, 所有的Bolts都会收到.<br /> 4. Global Grouping: 全局分组,这个tuple被分配到storm中的一个bolt的其中一个task.再具体一点就是分配给id值最低的那个task.<br /> 5. Non Grouping: 不分组,意思是说stream不关心到底谁会收到它的tuple.目前他和Shuffle grouping是一样的效果,有点不同的是storm会把这个bolt放到这个bolt的订阅者同一个线程去执行.<br /> 6. Direct Grouping: 直接分组,这是一种比较特别的分组方法,用这种分组意味着消息的发送者举鼎由消息接收者的哪个task处理这个消息.只有被声明为Direct Stream的消息流可以声明这种分组方法.而且这种消息tuple必须使用emitDirect方法来发射.消息处理者可以通过TopologyContext来或者处理它的消息的taskid (OutputCollector.emit方法也会返回taskid)</p> </td>      </tr>     </tbody>    </table>    <div id="MySignature">     <br /> 作者:     <a href="/misc/goto?guid=4959517936907596573" target="_blank"><span style="color:#3d81ee;">Aga.J</span></a>     <br /> 出处:     <a href="/misc/goto?guid=4959517935595607054" target="_blank"><span style="color:#3d81ee;">http://www.cnblogs.com/aga-j</span></a>     <br />    </div>