Play! Akka Flume实现的完整数据收集
-
前言
现如今,大数据如火如荼。针对用户行为,用户喜好等后续大数据分析也是十分火热。这个小项目实现了后台数据收集的一系列完整流程。
-
项目总体流程以及用到的技术
Play ! 作为web服务器,使用RESTful 规范编写接口(客户端事先埋点,然后调用接口上传数据)
Play !接口接受到的记录(json形式)经过处理后,先保存到 concurrentQueue中
Play! 启动后,start一个Akka schedulable actor.他每隔一段时间,让子actor去poll queue中的数据
调用flume的封装的rpc,将数据发送到指定的端口。
Flume source端接收数据,按照配置重定向数据,sink到console.
3. 后台实现
3.1 编写接口
采用RESTful编写接口,首先在play! 的conf中routes定义接口:
#run log POST /events/runlogs controllers.RunLogs.create()
然后编写controller
public static Result create(){ JsonNode js = request().body().asJson(); RunLog.create(js); //return ok anyway return ok(); }
然后是model
public static void create(JsonNode js) { if (js.has(LOG)) { String logBody = js.findPath(LOG).asText(); //add one log into queue QueueManager.INSTANCE.addRunLog(logBody); } }
可以看到,这些代码遵循MVC规范,首先让play!知道接口的定义,前端发送过来请求的时候,知道调用哪个controller中的哪个方法,并返回数据。而controller将数据从请求体中剥离出来,并发送给真正处理数据的model.
3.2 Queue
看到model中,接收到数据后,添加到queue中保存。
定义为:ConcurrentLinkedQueue<String>
3.3 Akka 定时调度
Akka负责定时从queue取出数据,然后通过rpc发送给flume。
akka的初始化,启动是伴随着play! 的启动而进行的,每个play!只有一个akka system。所以首先要编写一个Global extends GlobalSetting(GlobalSettings is instantiated by the framework when an application starts,to let you perform specific tasks at start-up or shut-down),然后override onStart方法,在此方法中初始化akka的调度器。
代码如下:
ActorRef dispatcher = Akka.system().actorOf(new Props(Dispatcher.class)); Akka.system().scheduler().schedule( Duration.create(200,TimeUnit.MICROSECONDS), Duration.create(2,TimeUnit.SECONDS), dispatcher, "run", Akka.system().dispatcher(), null );
可以看到,每个两秒钟,scheduler就会调用dispatcher,让他工作。
dispatcher在这里相当于一个master,他接到工作后,会通知自己的slave去工作(发送数据给rpc)。
代码如下:
ActorRef workRouter = getContext().actorOf( new Props(WorkerActor.class).withRouter(new RoundRobinRouter(40)) , "transferRouter"); @Override public void onReceive(Object message) throws Exception { ConcurrentLinkedQueue<String> runLogs = QueueManager.INSTANCE.getRunLogs(); dispatch(runLogs); } private void dispatch(Queue<?> queue) { if (!queue.isEmpty()) { Object obj = queue.poll(); List<String> data = new ArrayList<>(); while (obj != null) { data.add(obj.toString()); obj = queue.poll(); } workRouter.tell(new DispatchMsg("runlogs", data), getSelf()); } }
首先定义了一个router,他负责按照轮训算法,找到到底要让哪个slave去工作。
当dispatcher收到消息后,就让router通知WorkerActor去工作,并把从queue取出的数据给他,让他将这些数据通过rpc发送给远端的flume。
这样设计的目的在于:
接口接受消息,暂时保存在queue中,快速回复客户端,不堵塞。
利用akka并发能力,从queue中取出消息,找到一个worker去进行耗时较长的rpc工作。
workeractor
@Override public void onReceive(Object message) throws Exception { if (message instanceof DispatchMsg) { DispatchMsg msg = (DispatchMsg) message; String business = msg.business; List<String> datas = (List<String>) msg.data; sendMsg.ToFlume.sendDataToFlume(datas); } }
3.4 rpc发送
flume集成了rpc
public void sendDataToFlume(List<String> datas) { List<Event> es = new LinkedList<Event>(); for (String data : datas){ // Create a Flume Event object that encapsulates the sample data Event event = EventBuilder.withBody(data, Charset.forName("UTF-8")); es.add(event); } // Send the event try { client.appendBatch(es); System.out.println("data sent"); } catch (EventDeliveryException e) { // clean up and recreate the client client.close(); client = null; client = RpcClientFactory.getDefaultInstance(hostname, port); } }
这里接受的数据为list,批处理。
首先新建client,这里注意hostname跟port,是flume服务器端source的ip跟端口。
然后批量发送数据。
4. Flume
flume的配置内容如下:exemple.conf
a1.channels = c1 a1.sources = r1 a1.sinks = k1 a1.channels.c1.type = memory a1.sources.r1.channels = c1 a1.sources.r1.type = avro a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 41414 a1.sinks.k1.channel = c1 a1.sinks.k1.type = logger
此处flume端为简单的单点配置,source接收41414的rpc消息,然后保存到channel中,sink到console中(数据收集一般sink到HDFS中,并且可以多点收集)。
启动命令如下:
bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console
好了,启动flume后,启动play!然后利用客户端发送消息,就可以在flume端看到消息打印到console了。
项目的所有代码在 https://github.com/YulinGUO/collectEvents
如果有问题,请留言。
来自:http://my.oschina.net/yulinguo/blog/372191