ActiveMQ的插件开发介绍
在ActiveMQ中,Broker代表一个运行的MQ节点,ActiveMQ的插件实际上是基于Broker的一个Filter链,整个设计类似于 servlet的Filter结构,所有的Plugin构成一个链式结构,每个插件实际上都是一个"Interceptor",类结构图如下:
其中Broker接口封装了一个AMQ节点的方方面面的方法,包括连接管理、session管理、消息的发送和接收以及其它的一些功 能,BrokerFilter实现这个接口,并提供了链式结构支持,可以拦截所有Broker方法的实现并传递结果给链式结构的下一个,形成了一个完整 的"职责链"模式,具体层次关系如下,其中,"System Plugin"是指AMQ内部使用Plugin机制实现的一些系统功能,用户不能定制,"AMQ Plugin"指的是ActiveMQ已经实现好了,可以在配置文件中自由选择的一些插件,例如简单的安全插件,JAAS安全插件和DLQ插件等等,用户 插件就是指用户自己实现的amq插件,需要用户把相关jar包放入到amq的启动classpath中,并在配置文件中进行配置才能正确加载的插件。
在上面这个层次结构中,最下面的RegionBroker是核心组件,在其之上的都是Broker的插件,继承之于BrokerFilter,和Broker保持接口兼容但是扩展Broker的功能。
下面举一个简单的例子,具体说明一下AMQ的插件是如何工作的。
我们在使用AMQ的过程中发现,在测试环境维护方面有很大的麻烦,具体表现在很多同学在测试项目的时候往往只关注自己项目牵涉的队列,不会去消费其 他"不相关"的队列,这样导致的一个问题就是ActiveMQ经常发生大量数据阻塞,导致测试环境不可用,影响相关项目的测试工作。为了避免这个问题,我 们假定在测试环境可以定义以下一些限制条件:
1、 所有队列堆积消息不超过1000条,超过之后立即清除。
2、 消息超过1个小时没有消费,就直接过期。
我们可以编写一个简单的amq插件来完成这两个限制条件:
首先,编写一个插件安装类:
package com.alibaba.napoli.plugins; import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.BrokerPlugin; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; public class MessageControlBrokerPlugin implements BrokerPlugin { private static Log log = LogFactory.getLog(StatisticsBrokerPlugin.class); public Broker installPlugin(Broker broker) throws Exception { log.info("install MessageControlBrokerPlugin"); return new MessageControlBroker(broker); } } 其次,编写真正的插件实现: package com.alibaba.napoli.plugins; import java.io.IOException; import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.BrokerFilter; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.Queue; import org.apache.activemq.command.Message; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; /** * 开发环境管理插件,符合两个条件进行消息清理:<br> * 1 消息累积超过1000条 * 2 消息超过1个小时无人消费 * @author guolin.zhuanggl * */ public class MessageControlBroker extends BrokerFilter { public static Log log = LogFactory.getLog(DiscardingDLQBroker.class); private static final long DEFAULT_EXPIRATION = 3600*1000; private static final long DEFAULT_PURGE_COUNT = 1000; public MessageControlBroker(Broker next) { super(next); } @Override public void messageExpired(ConnectionContext context, MessageReference message) { Message msg = null; try { msg = message.getMessage(); } catch (IOException e) { log.error("failed to fetch content: ",e); } purgeMessage(msg); // TODO Auto-generated method stub super.messageExpired(context, message); } /** * 清除队列中的所有消息 */ private void purgeMessage(Message message){ Destination r = message.getRegionDestination(); if(r instanceof Queue){ try { //如果累积消息超过1000个,清除队列消息 if(((Queue) r).getMessages().size() > DEFAULT_PURGE_COUNT){ ((Queue) r).purge(); } } catch (Exception e) { // TODO Auto-generated catch block log.error("failed to purge queue "+r.getName(),e); } } } /** * 当消息发送时,全部设置过期时间1个小时,测试环境专用!!! */ @Override public void send(ProducerBrokerExchange producerExchange,Message messageSend) throws Exception { long oldExp = messageSend.getExpiration(); messageSend.setExpiration(oldExp < DEFAULT_EXPIRATION && oldExp > 0 ? oldExp : DEFAULT_EXPIRATION ); purgeMessage(messageSend); super.send(producerExchange, messageSend); } }
然后,将这两个类打包为myplugin.jar,并放在activemq启动目录下的lib目录下
最后,在activemq.xml文件中增加一个简单的spring配置项:
<bean xmlns="http://www.springframework.org/schema/beans"
id="purgePlugin"
class="com.alibaba.napoli.plugins.MessageControlBrokerPlugin">
</bean>
然后,重启activemq,就会发现这个插件已经被加载。