消息总线重构之简化客户端
这段时间对消息总线进行了再次重构。本次重构主要针对消息总线的pubsub组件以及对client的简化,同时谈谈对消息总线的一些想法。
简化client的复杂度
之前的client需要同时连接两个分布式组件。消息总线的访问需要用户提供pubsuberHost,pubsuberPort参数,因此它首先连接的就是pubsuber。而消息总线是基于RabbitMQ构建的,因此它必然还需要连接RabbitMQ。而之所以没有需要用户程序提供RabbitMQ Server的地址信息,是因为它是通过pubsuber间接获得的。
当时的想法是出于安全的角度考虑,不让用户直面MQ Server,而MQ的选择理论上可以有多种,这些对用户都是透明的。但作为一个后端组件,安全问题本不是最重要的关注目标,而替换MQ这样的成本无异于 重写消息总线,这样的可能性也不大。除此之外这还带来了额外的复杂度与高失败率(当pubsuber与RabbitMQ两者中有其一失败,消息总线就将陷 入混乱),结合到消息总线较多的长连接场景(比如,push模式的consume),一旦一个组件失效就可能导致客户程序的重启(为了重新初始化连接)。
让消息总线客户端只连接单一的RabbitMQ组件,可以大大降低失效的概率,而且RabbitMQ官方client提供的失效重试机制也能更好得发挥作用。
用RPC获取授权信息
因为之前的pubsuber部分承担了授权信息数据源的角色,移除之前的pubsuber组件,那么就需要重新设计获取远程授权信息的方案。因为 RabbitMQ正好提供了基于JSON的轻量级RPC机制,我们就可以通过RPC从后端获取授权信息,而让后端去跟pubsuber交互。之前就曾有过 这个想法,后来在使用HBase时,发现其java client内部也有通过RPC跟Master节点交互,于是这次就确定用这种方式来实现。其实采用RPC的形式可以大大简化客户端的逻辑实现,而且也大 大降低了升级成本。
修改broadcast和pubsub的实现
之前pubsuber在客户端还起到了两个作用:实现广播机制和实现实时控制。所以,如果要将pubsuber从client里移除,就要重新实现这两个功能。也就是说,要找到另一种支持 实时push 的机制,考虑到其实RabbitMQ本身就可以实现长连接的即时消费功能,这里选择直接基于RabbitMQ本身来实现。
我们新建了一个内部使用的exchange来实现消息路由。跟其他topic类型的exchange不同,这里我们采用并不常见的 headers 类型的exchange。
header类型:根据消息的消息头里包含特殊的key-value对来进行路由
考虑到我们需要重新实现上面两个功能,所以,我们将消息分成两类:event和notice。
- event:内部控制消息
- notice:广播消息
在发送这两个消息时,只需要在消息头中指定对应的header的key-value即可实现自动路由。这两种消息类型分别对应的是绑定在inner exchange后的队列的类型。
如何节省RabbitMQ Server的资源
考虑到几乎每个client都有接受这两种类型的消息的需求,而我们为每个客户端在该exchange下创建两个queue多少有些过于浪费,最好的做法当然是在客户端使用的会话周期内建立两个临时队列,等客户端使用结束就可立即销毁队列回收资源。
得益于RabbitMQ丰富的特性,我们可以很容易做到这一点。当我们实例化客户端的时候,我们在内部创建两个临时且排他的队列。所谓临时且排他即一种特殊的队列,它只对创建它的连接可见,当创建它的连接断开或者消费者个数从大于零降到零时,该队列就会被删除,具备这种属性的队列几乎是为一次会话而创建的。
临时且排他属性是通过在创建队列时,指定队列的auto-delete以及exclusive属性同时为true来实现的。
这两个队列被创建后,当前客户端会作为消费者立即挂载在队列上等待event和notice。
发送端无需知晓上述两个队列的具体名称,它只需知道代理exchange以及inner exchange的routing-key即可,然后在发送消息的消息头中指定需要发送的是event还是notice。
代码片段:
InnerEventEntity eventEntity = new InnerEventEntity(); eventEntity.setIdentifier(channel); eventEntity.setValue(new String(data)); eventEntity.setType("event"); String jsonObjStr = GSON.toJson(eventEntity); Message eventMsg = MessageFactory.createMessage(MessageType.QueueMessage); Map<String, Object> map = new HashMap<String, Object>(1); map.put("type", "event"); eventMsg.setHeaders(map); eventMsg.setContent(jsonObjStr.getBytes()); AMQP.BasicProperties properties = MessageHeaderTransfer.box(eventMsg); ProxyProducer.produce(Constants.PROXY_EXCHANGE_NAME, mqChannel, EVENT_ROUTING_KEY_NAME, eventMsg.getContent(), properties);
去除pubsuber的封装
为什么要封装
在去除之前,我想谈谈当初为什么要封装。在最初封装消息总线的时候,我对redis和zookeeper都有所了解,它们都有一些共同的特性,比如:
- 都能够以key-value的形式存取少量数据
- 都能提供Pub/Sub的实时push变更功能
这是消息总线客户端的pubsuber需要的,但为了提供可选性,我在这两个特性上做了一层封装,可以使得这种配置变更组件无论选择哪一个,无需修改代码,两者都可适配。这是当初封装的目的。
为什么要去除封装
首先,去除封装是回到了zookeeper而排除了redis。这么除了发现太多的开源软件都在使用zookeeper来实现这个需求,除了发现这是zookeeper的专长,而redis只是能提供这些功能而已。除了这些,最关键的问题是我发现当涉及到命名服务的特性时,redis将变得不再适合。
在分布式的服务中,很可能会存在多个组件,而这些组件跟应用之间会存在一些逻辑关系,而不都是简单的扁平关系。很多情况下,我们需要将一些关系构 建成树状结构。比如,现在消息总线只变成了平台中的一个组件,我们需要在配置上体现这种关系,所以可能会由原先的扁平关系修改为如下图这样的形式:
在这种类似于文件系统的树状结构下,要实现诸如获取子节点的变更事件这样的联动行为redis将无能为力。这是因为redis的pubsub功能,只提供在key-value(String)类型上。也就是说,它的 value只能是一级关系。当然,为了表示多层关系,你是可以在key里以“.”进行区分,比如”app1”,”app1.message”,虽然你能知 道他们之间的关系,但它们在技术层面上是一样的,无法产生联动变更功能。所以在一些场景下,zookeeper是无法取代的。
一些思考
拓扑的权衡
消息总线最初的目标主要偏向消息传递。但在实现中的添加了一些额外的特性,比如之前的RPC功能。其实,如果单从技术层面上来看消息总线就是收发消息。但如果你将收发消息的主体包含进来(也就是 发送者 和 消费者 )会有一些新的定位。如果有一些消费者做的事情是很通用的,基础的,很多人都需要的或者纯技术性的。那么处理这类消息的消费者就是在提供服务。比如,下面这些:
- 将数据存入ElasticSearch
- 发送短信
- 发送邮件
- 往移动端推送消息
消息总线可以直接提供这些服务以供第三方申请使用,当然如果带上语义来看,RPC的服务端也是一种服务(只不过是同步的服务而已),其他队列也可能在提供某种服务,只是它们的专有性更强,所以消息总线也具有提供服务的能力以及构建服务的基础。
所以我一直在考虑,整个路由图应该是这样构建:
还是这样来构建:
消息租赁
因为消息的通信模型生来就具有异步性。那么消息的消费时机,对于消息总线本身而言是无法知晓的,这就产生了消息长期堆积压垮消息总线的可能。所 以,可能会考虑将消息的永久驻留改为按序驻留。这取决于业务,有些业务消息是具有时效性的,这样的消息如果隔个几周还没有被消费掉,那么它存在的意义几乎 没有了,而它却白白占据着总线服务器的内存或磁盘资源,甚至这些消息将永远得不到消费也有可能。
所谓消息租赁,其实是将现在的永久留存的模式改为临时驻留队列的模式,具体消息能够存活多久,这取决于给消息设置的TTL(time to live)时间,而对于TTL的评估来自于队列申请方根据自身的业务特点而定。当然,TTL可以也可以设置为永久,这需要接收审核。
proxy的必要性
对于一个组件的扩展有很多种模式,比如proxy,smart client,plugin。消息总线封装自RabbitMQ,其实RabbitMQ官方是带plugin模式的扩展机制的,无奈语言所限,力所不及。
而对于proxy跟smart client的两种模式对比来看,可算是各有所长,优势互补。比如在侵入性上proxy侵入性更小,在掌控性上smart client掌控力更强。这里就不再过多比较。
现在对RabbitMQ的扩展采用的是smart client的形式。但这种方式总有它的受限之处,当你处于一个分布式的环境中,服务器上的资源在很多时候是被共享的(比如RabbitMQ里的队列,它可以同时被多 个client消费),你可以将它看成是多个分支河流汇聚的场景,分支是没办法掌控全部的,你只有依靠Proxy Server。
我曾看到携程开源的消息系统是在kafka和mysql前端做了一个proxy(它们称之为broker)。会不会构建一个proxy,看进度吧。但proxy的存在缺失能带来非常多的好处。