Spring整合ActiveMQ实践

jopen 10年前

demo下载:testjms.zip

ActiveMQ是一个开源的消息中间件,完美的遵循JMS规范,Spring提供了spring-jms模块来简化jms集成。所以给基于spring的应用添加jms功能是轻松愉快加浪漫的。
ActiveMQ提供了两种消息模式:Queue和topic,这两种消息模式的区别为:
Queue模式下,Provider发布的一条消息只能被一个Consumer消费。
topic模式下,Provider发布的一条消息能被多个Consumer消费。

开发步骤:

1、搭建ActiveMQ的环境

1)去ActiveMQ的官方下载ActiveMQ服务器,http://activemq.apache.org
2)启动ActiveMQ服务器,Windows下为:cd ${ActiveMQ-dir}/bin目录,执行activemq start
3)访问控制台,http://localhost:8161/admin,用户名和密码为[admin/admin],如果能正常访问,那么服务器搭建成功

2开发消息的Provider端(附件的testjms-send工程)

1)使用maven的maven-archetype-quickstart向导创建一个maven工程
2)在pom.xml中添加如下的依赖

 
<dependency>  <groupId>org.springframework</groupId>  <artifactId>spring-jms</artifactId>  <version>4.1.5.RELEASE</version>  </dependency>     <dependency>  <groupId>org.apache.activemq</groupId>  <artifactId>activemq-all</artifactId>  <version>5.11.1</version>  </dependency>  <dependency>  <groupId>log4j</groupId>  <artifactId>log4j</artifactId>  <version>1.2.17</version>  </dependency>



</div>

添加log4j是便于spring-jms输出日志信息。
3)在src/main/resources下新增log4j.properties文件
4)在src/main/resources下新增applicationContext.xml文件
该文件的完整内容如下:

 
<?xml version="1.0" encoding="UTF-8"?>  <beans xmlns="http://www.springframework.org/schema/beans"  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  xmlns:context="http://www.springframework.org/schema/context"  xsi:schemaLocation="  http://www.springframework.org/schema/context  http://www.springframework.org/schema/context/spring-context-3.0.xsd  http://www.springframework.org/schema/beans  http://www.springframework.org/schema/beans/spring-beans-3.0.xsd">     <context:component-scan base-package="cn.javacoder.testjms_send"/>     <bean id="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">  <property name = "brokerURL" value="tcp://localhost:61616" />  </bean>  <bean id="queue" class="org.apache.activemq.command.ActiveMQQueue">  <constructor-arg value = "test.queue" />  </bean>  <!-- <bean id="topic"class="org.apache.activemq.command.ActiveMQTopic">  <constructor-arg value="test.topic"/> </bean> -->     <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">  <property name = "connectionFactory" ref="connectionFactory" />  </bean>  </beans>



</div>

声明了三个bean,connectionFactory,queue,jmsTemplate,
connectionFactory类似jdbc的Connection对象,表示要链接的服务器的信息。
queue类似jdbc的某张表,表示将消息存在的位置。本例使用的队列模式
jmsTemplate是spring-jms提供的一个模板类,封装了jms操作时的一些例行代码。

5)接下来看看main方法

 
public static void main( String[] args )  {  User user = new User();  user.setEmail("javacoder.cn@hotmail.com");  user.setPassword("123456");  user.setPhone("123456");  user.setSex('M');  user.setUsername("javacoder.cn");     ApplicationContext context = new ClassPathXmlApplicationContext("classpath:applicationContext.xml");  SendService sendService = (SendService)context.getBean("sendService");  sendService.send(user);  System.out.println("send successfully, please visit http://localhost:8161/admin to see it");  }



</div>

main方法很简单,填充了一个User对象[注意发送端和接收端的对象的包名必须一致,且要实现Serialize接口]。利用applicationContext.xml构造了一个applicationContext对象,ApplicationContext中获取一个SendService对象,调用send()方法将user对象发送到ActiveMQ服务器中。

6)最后看看SendService的实现

 
@Component  public class SendService {  @Autowired  JmsTemplate jmsTemplate;     public void send(final User user) {  jmsTemplate.send("test.queue", new MessageCreator() {     public Message createMessage(Session session) throws JMSException {  return session.createObjectMessage(user);  }  });  }  }



</div>

超级简单,@Component声明SendService类能被自动注册,
注入了JmsTemplate对象,在send()中调用jmsTemplate.send()发送消息,本例发送的是一个ObjectMessage。

7)测试:
执行本demo,访问http://localhost:8161/admin,如图:


表示ActiveMQ确实收到了一条消息。

Consumer端实现(testjms-receive工程)

基本的步骤类似于Provider端的实现,下面讲讲不同点:
1、main()入口
由于DefaultMessageListenerContainer采用的是Deamon实现方式,所以当我们的应用停止的时候我们应该将其shutdown
具体代码参见main()方法:

 
public static void main( String[] args ) throws Exception  {  ApplicationContext context = new ClassPathXmlApplicationContext("classpath:applicationContext.xml");  //阻塞主线程  System.in.read();     DefaultMessageListenerContainer container = (DefaultMessageListenerContainer)context.getBean("jmsContainer");  container.shutdown();  }



</div>

使用 System.in.read();调用阻塞主线程,当我们通过键盘输入一些字符回车后DefaultMessageListenerContainer被shutdown

2)MessageDriven接收端实现
如果我们调用jms的receive接口方法,如果队列中没有可读的消息,那么当前线程将阻塞直到有消息可用或者超时为止。那能不能等有消息的时候让ActiveMQ主动的回调Consumer呢,这就是MessageDriven bean的思想啦。
spring-jms对MessageDriven bean的支持相当给力,即提供了DefaultMessageListenerContainer 类,也提供了jms命名空间的<jms:listener-container>指令。直接使用 DefaultMessageListenerContainer类需要你的回调类实现MessageListener接口,如我在demo中 ReceiveService2类的处理逻辑。使用<jms:listener-container>指令,那么你的回调类就是一个POJO 类啦,这就是spring-jms所谓的MessageDriven POJO。具体可以参考我的ReceiveService类的实现以及<jms:listener-container>元素的声明。

3)测试:
运行Consumer,后,会发现控制台打印出了如下的信息:
from MessageListener —>username:javacoder.cn|email:javacoder.cn@hotmail.com
当再次访问http://localhost:8161/admin时发现该消息确实被消费了。

来自:http://www.javacoder.cn/?p=416