Spring整合ActiveMQ实践
demo下载:testjms.zip
ActiveMQ是一个开源的消息中间件,完美的遵循JMS规范,Spring提供了spring-jms模块来简化jms集成。所以给基于spring的应用添加jms功能是轻松愉快加浪漫的。
ActiveMQ提供了两种消息模式:Queue和topic,这两种消息模式的区别为:
Queue模式下,Provider发布的一条消息只能被一个Consumer消费。
topic模式下,Provider发布的一条消息能被多个Consumer消费。
开发步骤:
1、搭建ActiveMQ的环境
1)去ActiveMQ的官方下载ActiveMQ服务器,http://activemq.apache.org
2)启动ActiveMQ服务器,Windows下为:cd ${ActiveMQ-dir}/bin目录,执行activemq start
3)访问控制台,http://localhost:8161/admin,用户名和密码为[admin/admin],如果能正常访问,那么服务器搭建成功
2开发消息的Provider端(附件的testjms-send工程)
1)使用maven的maven-archetype-quickstart向导创建一个maven工程
2)在pom.xml中添加如下的依赖
<dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>4.1.5.RELEASE</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.11.1</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency>
添加log4j是便于spring-jms输出日志信息。
3)在src/main/resources下新增log4j.properties文件
4)在src/main/resources下新增applicationContext.xml文件
该文件的完整内容如下:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation=" http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd"> <context:component-scan base-package="cn.javacoder.testjms_send"/> <bean id="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory"> <property name = "brokerURL" value="tcp://localhost:61616" /> </bean> <bean id="queue" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg value = "test.queue" /> </bean> <!-- <bean id="topic"class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg value="test.topic"/> </bean> --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name = "connectionFactory" ref="connectionFactory" /> </bean> </beans>
声明了三个bean,connectionFactory,queue,jmsTemplate,
connectionFactory类似jdbc的Connection对象,表示要链接的服务器的信息。
queue类似jdbc的某张表,表示将消息存在的位置。本例使用的队列模式
jmsTemplate是spring-jms提供的一个模板类,封装了jms操作时的一些例行代码。
5)接下来看看main方法:
public static void main( String[] args ) { User user = new User(); user.setEmail("javacoder.cn@hotmail.com"); user.setPassword("123456"); user.setPhone("123456"); user.setSex('M'); user.setUsername("javacoder.cn"); ApplicationContext context = new ClassPathXmlApplicationContext("classpath:applicationContext.xml"); SendService sendService = (SendService)context.getBean("sendService"); sendService.send(user); System.out.println("send successfully, please visit http://localhost:8161/admin to see it"); }
main方法很简单,填充了一个User对象[注意发送端和接收端的对象的包名必须一致,且要实现Serialize接口]。利用applicationContext.xml构造了一个applicationContext对象,ApplicationContext中获取一个SendService对象,调用send()方法将user对象发送到ActiveMQ服务器中。
6)最后看看SendService的实现
@Component public class SendService { @Autowired JmsTemplate jmsTemplate; public void send(final User user) { jmsTemplate.send("test.queue", new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createObjectMessage(user); } }); } }
超级简单,@Component声明SendService类能被自动注册,
注入了JmsTemplate对象,在send()中调用jmsTemplate.send()发送消息,本例发送的是一个ObjectMessage。
7)测试:
执行本demo,访问http://localhost:8161/admin,如图:
表示ActiveMQ确实收到了一条消息。
Consumer端实现(testjms-receive工程)
基本的步骤类似于Provider端的实现,下面讲讲不同点:
1、main()入口
由于DefaultMessageListenerContainer采用的是Deamon实现方式,所以当我们的应用停止的时候我们应该将其shutdown
具体代码参见main()方法:
public static void main( String[] args ) throws Exception { ApplicationContext context = new ClassPathXmlApplicationContext("classpath:applicationContext.xml"); //阻塞主线程 System.in.read(); DefaultMessageListenerContainer container = (DefaultMessageListenerContainer)context.getBean("jmsContainer"); container.shutdown(); }
使用 System.in.read();调用阻塞主线程,当我们通过键盘输入一些字符回车后DefaultMessageListenerContainer被shutdown
2)MessageDriven接收端实现
如果我们调用jms的receive接口方法,如果队列中没有可读的消息,那么当前线程将阻塞直到有消息可用或者超时为止。那能不能等有消息的时候让ActiveMQ主动的回调Consumer呢,这就是MessageDriven bean的思想啦。
spring-jms对MessageDriven bean的支持相当给力,即提供了DefaultMessageListenerContainer 类,也提供了jms命名空间的<jms:listener-container>指令。直接使用 DefaultMessageListenerContainer类需要你的回调类实现MessageListener接口,如我在demo中 ReceiveService2类的处理逻辑。使用<jms:listener-container>指令,那么你的回调类就是一个POJO 类啦,这就是spring-jms所谓的MessageDriven POJO。具体可以参考我的ReceiveService类的实现以及<jms:listener-container>元素的声明。
3)测试:
运行Consumer,后,会发现控制台打印出了如下的信息:
from MessageListener —>username:javacoder.cn|email:javacoder.cn@hotmail.com
当再次访问http://localhost:8161/admin时发现该消息确实被消费了。
来自:http://www.javacoder.cn/?p=416