[Storm中文文档]Trident教程
Trident是一个基于Storm的用于实时计算的高级抽象原语。它支持高吞吐(每秒百万级别),有状态的流处理,并且还能够提供低延时的分布式查询功能。如果你熟悉一些比较高级的批处理工具,比如Pig和Cascading,那么对于Trident你应该有一种似曾相识的感觉。Trident具有连接,聚合,分组,自定义行为和过滤的功能。除此之外,Trident能够基于内存或者数据库做有状态的,增量式的计算。Trident本身能够保证每个Tuple严格只被执行一次,所以使用Trident很容易构建一个靠谱的Topology。
Illustrative example
下面通过一个例子介绍Trident。这个例子需要做两件事:
- 从一个能产生句子的输入流中实时计算各个单词的数量;
- 实现查询功能:输入一个句子,句子中每个单词用空格分隔,查询这个句子中所有单词出现的数量的总和。
出于演示目的,本例将从一个能够产生无限英文句子的输入流中读取数据:
FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"), new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"), new Values("how many apples can you eat")); spout.setCycle(true);
该Spout能够循环产生无限的英文语句,下面的代码是计算单词出现次数的部分代码:
TridentTopology topology = new TridentTopology(); TridentState wordCounts = topology.newStream("spout1", spout) .each(new Fields("sentence"), new Split(), new Fields("word")) .groupBy(new Fields("word")) .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count")) .parallelismHint(6);
我们一行一行地对照上面的代码来作说明。首先创建一个TridentTopology,它提供了用于构建Trident实时计算程序的一些接口。TridentTopology有一个函数叫做 newStream ,它通过一个指定的Spout创建一个新的数据输入流。在本例中,输入流仅仅是一个比较简单的FixedBatchSpout。输入流也可以是消息队列,比如Kestrel和Kafka。Trident在Zookeeper保存每一个从输入流中读取的Tuple的处理信息,在上面的代码中,字符串”spout1”表示这些Tuple的处理信息在Zookeeper上的存储路径。
Trident是将输入数据分成许多小块做批量处理的。例如,本例中输入的数据流有可能被分割成如下这样的小块:
通常来说,每一个batch可能包含几千到上百万的Tuple,这完全取决于输入的数据量。
Trident提供了一整套比较完整的API来处理这些batch中的数据。这些API与Pig和Cascading的API非常类似:能够分组,连接,聚合,执行自定义行为,还能进行过滤等等。当然,独立处理每一个batch并没有多大意义,所以,Trident提供了跨batch的数据聚合与存储功能,比如存储在Memory,Memcache,Cassandra或者其他存储设备。此外,Trident还能提供一流的实时查询功能。这些状态能够被Trident更新(就像上面的例子),也能作为一个独立的状态源存在(笔者注:这句话不太理解!)。
回到上面的例子,Spout发射了一个包含sentence字段的数据流。下一行代码定义了一个函数Split用于处理输入流中的每一个Tuple:获取sentence字段并将其切割成很多单词。每一个sentence类型的Tuple将会衍生出很多word类型的Tuple:比如本例中’the cow jumped over the moon’这个sentence类型的Tuple将会衍生出6个word类型的Tuple。下面是Split函数的定义:
public class Split extends BaseFunction { public void execute(TridentTuple tuple, TridentCollector collector) { String sentence = tuple.getString(0); for(String word: sentence.split(" ")) { collector.emit(new Values(word)); } } }
正如我们所见到的,Split函数的定义非常简单。仅仅是获取sentence,然后用空格切割成很多word,然后分别发射这些word。
剩余的代码计算word出现的次数并将结果保存起来。首先数据流按照word字段分组,然后每一个分组都被自定义聚合器Count聚合。persistentAggregate函数知道怎么存储和更新计算结果的状态。在本例中,单词出现的次数被保存在内存中,但是也可以替换成其他的存储设备,比如Memcached,Cassandra等等。比如替换存储设备为Memcached很简单,只需要将包含persistentAggregate的这一行代码替换成下面的代码即可,这儿的serverLocations表示的是Memcached集群的机器的host/ip列表:
.persistentAggregate(MemcachedState.transactional(serverLocations), new Count(), new Fields("count")) MemcachedState.transactional()
persistentAggregate存储的值就是所有batch聚合之后的值。
Trident一个比较酷的事情就是它是完全容错的,保证数据严格只被执行一次的。这使得我们很容易构建靠谱的实时计算系统。Trident存储每个Tuple的处理状态,以便在有错误发生时,可以恢复数据并且防止一个数据被重复处理。
persistentAggregate方法会将数据流转换成TridentState对象。在本例中TridentState对象就是所有单词的统计数据。我们将会运用这个TridentState对象来实现一个分布式的实时查询系统。
Topology的另一个部分就是实现一个低延时的分布式查询系统:输入一个句子,句子中的单词用空格分隔,查询系统返回这些单词的统计数据的和。除了在后台是并行执行的,这种查询和普通的RPC调用没有啥区别。下面是一个查询的例子:
DRPCClient client = new DRPCClient("drpc.server.location", 3772); System.out.println(client.execute("words", "cat dog the man"); // prints the JSON-encoded result, e.g.: "[[5078]]"
正如你所见到的,它和普通的RPC调用完全没有区别,除了它是在Storm集群间并行执行的。对于一般的小型的查询,耗时大概在10ms,当然越重的查询花费的时间会更长,尽管延时还受到所分配的资源多少的影响。
实现分布式查询系统的代码如下:
topology.newDRPCStream("words") .each(new Fields("args"), new Split(), new Fields("word")) .groupBy(new Fields("word")) .stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count")) .each(new Fields("count"), new FilterNull()) .aggregate(new Fields("count"), new Sum(), new Fields("sum"));
相同的TridentTopology被用来创建一个新的DRPC流,这个函数命名为words。当执行查询操作的时候,这个名字会作为DRPCClient的第一个参数。
每一个DRPC请求会被当做一个只有一个Tuple的batch处理。Tuple包含一个叫做args的字段,该字段表示DRPC客户端提供的参数,在本例中,该参数就是一个sentence。
首先,Split函数用来将输入的sentence切割成很多个word。然后数据流会按照word分组,然后stateQuery函数将会在第一部分创建的TridentState(wordCounts)上执行查询操作。StateQuery接收一个数据源(在本例中,就是已经计算好的单词统计数据)和一个用于查询的函数作为输入。在本例中,我们使用MapGet函数来获取单词的统计值。由于单词是按照和构建TridentState时一样的方法(按照word分组)分组的,所以每一个单词的查询请求都会被路由到管理和更新该单词的统计数据的分区中取执行。
下一步,没有出现过的单词(count值为0)将会被Storm内建的FilterNull过滤器过滤,然后Sum函数将会把所有单词的统计数据做一次求和操作,保存到sum字段中,并返回给DRPC客户端。
Trident致力于提高Topology结构的性能。在上面的示例中,Trident自动完成了两件事:
- 读取和写入操作会自动使用batch的方式执行,如果有20次更新需要被同步到数据库中,Trident会自动将这些操作汇总到一起,只做一次读写操作,而不是20次。因此Trident可以在方便你计算的同时提高极高的性能。
- Trident对聚合操作做了极大的优化。Trident并不是简单地把一个Group中所有的Tuple都发送到同一个机器上进行聚合,如果条件允许,Trident在将数据通过网路发送之前已经做了部分聚合操作,Count聚合器在每一个分区分别计算统计值,然后通过网络发送分区的计算结果,然后将所有分区的计算结果进行汇总得到总的结果。这与MapReduce的计算模型非常相似。
再看一个Trident的例子:
Reach
这个例子是一个用于计算一个给定的URL的Reach的纯粹的DRPC Topology。什么是URL的Reach?Reach是指在推ter上看到过一个URL的不同的人的数量,要计算Reach值,首先你要获取曾经发布过该URL的所有人,然后获取所有这些人的所有粉丝,然后将这些粉丝做唯一化处理,得到的数字就是该URL的Reach值。如果使用单机计算URL的Reach值,这将会是一个非常繁重的任务,因为这将会产生成千上万的数据库请求,千万级别的Tuple数量。使用Storm和Trident,你能够将上面说到的这些步骤在集群机器间作并行计算。
该Topoloy将会从两个地方读取数据。一个用于读取曾经发布过该URL的人,另一个用于读取这些人的粉丝。定义如下:
TridentState urlToTweeters = topology.newStaticState(getUrlToTweetersState()); TridentState tweetersToFollowers = topology.newStaticState(getTweeterToFollowersState()); topology.newDRPCStream("reach") .stateQuery(urlToTweeters, new Fields("args"), new MapGet(), new Fields("tweeters")) .each(new Fields("tweeters"), new ExpandList(), new Fields("tweeter")) .shuffle() .stateQuery(tweetersToFollowers, new Fields("tweeter"), new MapGet(), new Fields("followers")) .parallelismHint(200) .each(new Fields("followers"), new ExpandList(), new Fields("follower")) .groupBy(new Fields("follower")) .aggregate(new One(), new Fields("one")) .parallelismHint(20) .aggregate(new Count(), new Fields("reach"));
上面的的topology使用newStaticState方法创建一个TridentState对象,表示一种外部存储,使用这个TridentState对象,我们就能在该topology上面进行查询了,和其他所有状态源一样,查询请求会自动转换成batch类型的请求,以提高性能。
这个拓扑的定义非常简单,它就是一个简单的批处理任务。首先,urlToTweeters用于查询曾经发布过该URL的人,返回一个列表,然后使用ExpandList函数将该列表转换成一个个的Tuple分别发射出去。
接下来,我们获取每一个tweeter的followers。我们使用suffle函数将需要处理的tweeter分配到集群的每一个分区,然后集群的每一个分区会分别获取他们收到的tweeter的follower,可以为该步骤设置很大的并行度,提高查询性能。
接下来,tweeter的所有follower需要去重,这个可以分两个步骤完成:首先按照follower分组,然后使用One聚合器对每一个分组进行聚合,One聚合器仅仅是简单地为每一个分组发射一个Tuple(重复的Tuple被舍弃),保存在one字段中,然后使用Count聚合器统计Tuple的数量,存入reach字段中,这个值就是URL的Reach值。One聚合器的定义如下:
public class One implements CombinerAggregator<Integer> { public Integer init(TridentTuple tuple) { return 1; } public Integer combine(Integer val1, Integer val2) { return 1; } public Integer zero() { return 1; } }
这是一个”combiner aggregator”,为了提高性能,它会在将Tuple通过网络传输之前做部分聚合操作。Sum集合器也是一个”combiner aggregator”,因此在最后计算总值是非常高效的。
再看看Trident更细节的东西.
Fields and tuples
Trident的数据模型就是一个TridentTuple,它是一个命名的值列表。在topology中,TridentTuple是在一系列的计算中增量产生的。这些操作一般以一组字段作为输入,然后产生一组输出字段,输入一般是输入Tuple的一组子字段。
参照如下的例子,假设你有一个输入流,命名为stream,它包含三个字段: x, y, z, 为了运行一个过滤器,并且这个过滤器只接受输入流中的y字段,我们可以这样写:
stream.each(new Fields("y"), new MyFilter())
假设MyFilter的定义如下:
public class MyFilter extends BaseFilter { public boolean isKeep(TridentTuple tuple) { return tuple.getInteger(0) < 10; } }
只保留y字段的值小于10的Tuple。TridentTuple传给MyFilter过滤器的输入Tuple包含一个字段y。需要注意的是,选择一个Tuple的某些字段的这个操作是非常高效的。
接下来看看”function fields”是如何工作的。假设你定义了一个函数:
public class AddAndMultiply extends BaseFunction { public void execute(TridentTuple tuple, TridentCollector collector) { int i1 = tuple.getInteger(0); int i2 = tuple.getInteger(1); collector.emit(new Values(i1 + i2, i1 * i2)); } }
该函数接收两个数字同时发射两个数字:输入的两个数字的和和输入的两个数字的积。假设你有一个输入流,包含x,y,z三个字段,你可以像下面这样使用上面的函数:
stream.each(new Fields("x", "y"), new AddAndMultiply(), new Fields("added", "multiplied"));
函数的输入字段是追加到输入的字段后面的,所以该函数执行后的输出会包含5个字段:x,y,z, added, multiplied,added字段是AddAndMultiply发射的第一个字段,multiplied是第二个字段。
但是对于聚合操作,新产生的字段将会替换输入的字段。所以,如果你有一个输入流,包含val1,val2两个字段,当你做如下的操作:
stream.aggregate(new Fields("val2"), new Sum(), new Fields("sum"))
之后,输出的Tuple会只包含sum一个字段,表示该batch种所有val2字段的值的总和。
对于分组之后再聚合的操作,输出字段将会包含分组的字段和聚合操作新产生的字段。比如:
stream.groupBy(new Fields("val1")) .aggregate(new Fields("val2"), new Sum(), new Fields("sum"))
上面的例子的输出结果中将会包含val1和sum两个字段。
State
实时计算系统一个比较关键的问题就是如何确保在有错误发生或者重试时还能保证计算的正确性。出问题是无法避免的,所以当某个节点宕机或者其他错误出现时,我们需要重试。现在的问题是:如何保证在重试过程中,一条数据值被处理一次?
这是一个比较困难的问题,我们通过一个例子进行说明。假设你现在正在统计输入流中处理过的Tuple的数量,并且需要将统计结果保存到数据库中。如果你仅仅在数据库中存储一个统计值,现在如果你想进行一次状态更新,那么你将无法知道当前的这个Tuple是否之前已经被处理过,或者之前尝试处理过,数据库也更新成功,但是之后的某些步骤失败了,亦或者其他步骤都处理成功了,但是更新数据库失败了。
要解决这个问题需要做如下两件事情:
- 为每一个batch分配一个唯一的transaction id(txid),当batch数据重试时,该batch会具有和之前一样的txid;
- 数据的更新操作在batch之间是强有序的。也就是说,如果batch 2更新完成之前,batch 3不允许更新。
具备以上两个条件之后,你就能实现有且仅有一次更新的目的。除了保存统计值之外,你还需要将txid也保存在数据库中。当更新数据时,你就可以将数据库中保存的txid和当前处理的batch的txid做比较。如果两者相等,你应该忽略该batch,因为batch之间的处理是强有序的,你能够跟确定当前batch在之前已经被处理过了;如果两者不一致,则表示当前batch之前没有处理过,你需要在数据库中更新数据。
当然,上面说到的这些操作都不需要你自己实现,他们都已经被Trident封装在内部的实现里,并且会自动实现。如果你不想在数据库中花费外的空间去存储txid,你可以不做。但是在这种情况下,Trident只能保证一个Tuple至少被处理一次,无法保证只被处理一次。可以参考 这篇文档 解更多关于Trident State的知识。
State允许你使用任何策略来保存状态。所以它可以将状态保存在外部的数据库,也可以保存在内存中并备份到HDFS中(类似于Hbase的工作模式)。State并不需要永久保存状态,例如,你可以实现一个内存版的State,仅仅保存最近的X个小时的数据,老数据直接丢弃。可以参照 这个项目 看看Memcached integration的实现。
Execution of Trident topologies
Trident的Topology会被编译成效率最高的Storm Topology。只有在需要对数据进行repartition的时候(如groupby或者shuffle)才会把tuple通过network发送出去,比如你有一个Trident Topology如下:
它将被编译成如下的Storm Topology:
英文文档原文地址: http://storm.apache.org/documentation/Trident-tutorial.html