SyncSpout:用来构造可交互的、同步的 Storm 拓扑的组件

zchkingdom 8年前
   <h2><img src="https://simg.open-open.com/show/bec93a5207325f491eacaa82337d4190.png"> SyncSpout简介</h2>    <p>SyncSpout是上海华瑞银行(SHRB)大数据团队开发的,用来构造可交互的、同步的Storm拓扑的组件。我们在做实时推荐系统中,希望将Storm的并发性和分布式计算能力应用到“请求-响应”范式中, 比如客户的某次购买行为能够以消息的形式发送到storm拓扑中,storm在指定时间返回推荐结果,也就是说storm需要具有可交互性。基于这样的背景,大数据团队开发了SyncSpout组件, 该组件可以接收客户端异步的消息,经过Storm拓扑异步计算,在指定时间内返回给客户端。</p>    <h2>架构图</h2>    <p style="text-align:center"><img src="https://simg.open-open.com/show/b2991d8db5a2d9f1e67df4f08f365628.jpg"></p>    <h2>关键组件介绍</h2>    <ul>     <li>SyncSpout:继承storm的IRichSpout,用于接收客户端调用消息并将消息emit出去的Spout</li>     <li>SendBolt:拓扑中发送计算结果的bolt,该bolt将计算结果返回给客户端</li>     <li>SyncSpoutClient:用于向SyncSpout发送同步消息,并在指定时间内获取结果</li>    </ul>    <h2>特性</h2>    <ul>     <li>使普通的storm应用可交互</li>     <li>storm应用重启后,客户端可自动重连</li>     <li>对storm应用几乎没有侵入,对业务没有侵入</li>     <li>storm集群返回的计算结果能够准确的返回给指定客户端的某次调用</li>     <li>客户端可发送任意类型的消息给storm应用;storm应用可返回任意类型的消息给客户端</li>     <li>客户端可在指定时间内同步获取storm应用返回的计算结果</li>     <li>支持高并发,在单机环境下1000并发量基本在100毫秒内返回</li>    </ul>    <h2>与Storm官方DRPC的异同</h2>    <ul>     <li>都能接收一个远程请求,发送请求到storm拓扑,从storm拓扑接收结果,发送结果回等待的客户端</li>     <li>DRPC只能处理字符串;SyncSpout可以处理任意可序列化的类型</li>     <li>DRPC仅能处理“线性的”DRPC拓扑,计算以一连串步骤的形式表达;SyncSpout能够处理任意类型的storm拓扑</li>     <li>DRPC的功能被移植到了Trident中,从原生Storm被废弃了;SyncSpout会被SHRB一直维护</li>    </ul>    <h2>用法</h2>    <h3>客户端</h3>    <pre>  <code class="language-python">// 创建客户端  val client = new SyncSpoutClient(topName)  // 初始化  client.init()  // 向远程storm集群发送消息,并在1000毫秒内返回,若超时则返回null指针  val syncResult = client.ask(ClientMsg("这是发送的消息,可以是任意类型"),1000).asInstanceOf[String]  println(s"返回消息是[$syncResult],可以是任意类型")</code></pre>    <h3>storm集群</h3>    <pre>  <code class="language-python">val builder = new TopologyBuilder()  // ActorSpout用于接收消息  builder.setSpout("syncSpout",SyncSpout(),2)  // SimpleBolt用于处理消息  builder.setBolt("simpleBolt",new SimpleBolt(),2).setNumTasks(4).shuffleGrouping("syncSpout")  // SendBolt用于返回消息  builder.setBolt("sendBolt",new SendBolt(),2).shuffleGrouping("simpleBolt")  val cluster = new LocalCluster()  val topName = "SyncSpoutTop"  val conf = new Config()  conf.setNumWorkers(2)  cluster.submitTopology(topName,conf,builder.createTopology())  println( "SyncSpout 启动成功!" )</code></pre>    <p>注意点</p>    <ul>     <li>客户端实例化时的topName就是storm集群中的名称</li>     <li>sync-spout-server.conf、sync-spout-client.conf中需要配置zookeeper的host列表</li>    </ul>    <h2>引用第三方类库</h2>    <ul>     <li>zkclient: <a href="/misc/goto?guid=4959731376323629560" rel="nofollow,noindex">https://github.com/yuluows/zkclient.git</a></li>     <li>MPSC队列:参考akka_2.11-2.4.11的AbstractBoundedNodeQueue类</li>    </ul>    <p> </p>    <p> </p>