jms与ActiveMQ实践与应用

qcdq1487 9年前

来自: http://www.iteye.com/topic/1119727


词语解释

(有些词可能用的不是很正确,在这里我把自己能意识到的词拿出来解释一下):

1、  跨服务器:专业术语好像叫“跨实例”。意思是,可以在多个服务器(可以是不同的服务器,如resintomcat)之间相互通信。与之对应的是单服务器版。

2、  消息生产者:就是专门制造消息的类。

3、  消息消费者:也叫消息接收者,它主要是实现了消息监听的一个接口,当然,也可以难过Spring提供的一个转换器接口指定任意一个类中的任意方法。

我们都知道,任何一个系统从整体上来看,其实质就是由无数个小的服务或事件(我们可以称之为事务单元)有机地组合起来的。对于系统中任何一个比较复杂的功能,都是通过调用各个独立的事务单元以实现统一的协调运作而实现的。

现在我们的问题是,如果有两个完全独立的服务(比如说两个不同系统间的服务)需要相互交换数据,我们该如何实现?

好吧,我承认,我很傻很天真,我想到的第一个方法就是在需要的系统中将代码再写一遍,但我也知道,这绝对不现实!好吧,那我就应该好好学习学习达人们是如何去解决这样的问题。

第一种方法,估计也是用的最多的,就是rpc模式。这种方法就是在自己的代码中远程调用其它程序中的代码以达到交换数据的目的。但是这种方法很显然地存在了一个问题:就是一定要等到获取了数据之后才能继续下面的操作。当然,如果一些逻辑是需要这些数据才能操作,那这就是我们需要的。

第二种方法就是Hessian,我个人觉得Hessian的实现在本质上与rpc模式的一样,只是它采用了配置,简化了代码。

上面这两个方法,基本上能解决所有的远程调用的问题了。但是美中不足的是,如果我在A系统中有一个操作是需要让B系统做一个响应的,但我又不需要等它响应完才做下面的操作,这该怎么办?于是新的解决方案就需要被提出来,而SUN公司的设计师们也考虑到了,在JAVA中这就被体现为JMSjava message service)。

一、认识JMS

JMS模块的功能只提供了接口,并没有给予实现,实现JMS接口的消息中间件叫JMS Provider,这样的消息中间件可以从Java里通过JMS接口进行调用。

JMS消息由两部分构成:headerbodyheader包含消息的识别信息和路由信息,body包含消息的实际数据。

JMS的通用接口集合以异步方式发送或接收消息。另外, JMS采用一种宽松结合方式整合企业系统的方法,其主要的目的就是创建能够使用跨平台数据信息的、可移植的企业级应用程序,而把开发人力解放出来。

Java消息服务支持两种消息模型:Point-to-Point消息(P2P)和发布订阅消息(Publish Subscribe messaging,简称Pub/Sub,也就是广播模式)。

根据数据格式,JMS消息可分为以下五种:

BytesMessage   消息是字节流。

MapMessage   消息是一系列的命名和值的对应组合。

ObjectMessage   消息是一个流化(即继承Serializable)Java对象。

StreamMessage   消息是Java中的输入输出流。

TextMessage   消息是一个字符串,这种类型将会广泛用于XML格式的数据。

二、使用JMS

在使用JMS时,其步骤很像使用JDBC一样,需要的步骤为:

1、建立消息连接(也就是建立连接工厂);

2、设定消息目的地(其实与步骤1中用的类是一样的,只是它是用来指定目的地,而步骤1中是用来指定消息服务器地址的)

3、创建jmsTemplate实例(为下一步构建消息sessin作准备);

4、创建消息生产者(其中就用到了23两步的产物),它就是一个普通的类,一般是通过send方法发送消息,也可以通过MessageListenerAdapter指定发送信息的方法;

5、创建MDP(也就是消息接收者,它是一个必须实现MessageListener接口的类);

6、为每个MDP建立一个监听容器,当有相应的消息传来,则它会自动调用对应的MDP消费消息。

整个过程就像编写JDBC一样,代码维护量很大。为此,让Spring对其进行管理是个不错的选择。

三、Spring整合JMS

Spring框架提供了一个模板机制来隐藏Java APIs的细节。开发人员可以使用JDBCTemplateJNDITemplate类来分别访问后台数据库和JEE资源(数据源,连接池)。JMS也不例外,Spring提供JMSTemplate类,因此开发人员不用为一个JMS实现去编写样本代码。接下来是在开发JMS应用程序时Spring所具有一些的优势。

1. 提供JMS抽象API,简化了访问目标(队列或主题)和向指定目标发布消息时JMS的使用。

2. 开发人员不需要关心JMS不同版本(例如JMS 1.0.2JMS 1.1)之间的差异。

3. 开发人员不必专门处理JMS异常,因为Spring为所有JMS异常提供了一个未经检查的异常,并在JMS代码中重新抛出

具体的详细步骤与方法参考 spring-reference2.5.pdf 中的第十九章。

下面,我就将我在整个学习过程中实践过的例子一一列举出来,并将在其中遇到的问题和心得给出一定的说明,希望对大家能有所帮助。

四、实例

(一)、配置单服务器版消息机制

1、首先,我们需要配置resin下的resin.conf文件,在其中(<server></server>之间)加上:

<!-- The ConnectionFactory resource defines the JMS factory for creating JMS connections -->

<resource jndi-name="jms/factory"

    type="com.caucho.jms.ConnectionFactoryImpl">

</resource>

<!-- Queue configuration with Resin's database  -->

<resource jndi-name="jms/queue"

    type="com.caucho.jms.memory. MemoryQueue">

<init>

    <queue-name>OssQueue</queue-name>

    </init>

</resource>

<!-- Queue configuration with Resin's database  -->

<resource jndi-name="jms/topic"

    type="com.caucho.jms.memory. MemoryTopic">

<init>

    <queue-name>ossTopic</queue-name>

    </init>

</resource>

注:i、我现在只知道JNDI方式配置消息的连接工厂,我并不知道有没有其它的方式,但我看了许多资料上也没提到其它配置方式。

ii、网上很少有关于在resin中配置JMS消息工厂的资料,只有在resin的官网上才能见到。

iii、上面JNDI配置的地方需要注意的是,大家如果在网上看资料的话,可能会发现网上会比我给出的总是会多一些,也就是总是多一些<data-source>的初始化配置,如:

<resource jndi-name="jms/factory"

    type="com.caucho.jms.ConnectionFactoryImpl">

  <init>

    <data-source>jdbc/database</data-source>

  </init>

</resource>

就这样的配置,单独启动resin是没有问题的,但是如果将其按照下面的Spring配置加到系统中,就会出异常(具体的异常名称我忘了,中文的大概意思是:数据库对象不能转换成JMS连接对象,还有一种情况是启动系统时会内存溢出)。我认为这种配置可能是数据库消息模式的配置(因为JMS有内存和数据库两种管理方式,我目前只学习了内存管理的方式,至于数据库管理方式大家要是有兴趣可以参考:

http://www.oracle.com/technology/books/pdfs/2352_Ch06_FINAL.pdf

2、在web.xml文件中配置一个spring用的上下文:

<context-param>

    <param-name>contextConfigLocation</param-name>

    <param-value>/WEB-INF/jmsconfig.xml</param-value>

</context-param>

<!-- 配置Spring容器 -->

<listener>

    <listener-class>

    org.springframework.web.context.ContextLoaderListener

</listener-class>

</listener>

注:我是将jmsconfig.xml加载到service.xml中随系统启动的。

3、创建jmsconfig.xml用来装配jms,内容如下:

<?xml version="1.0" encoding="UTF-8"?>  

<!DOCTYPE beans PUBLIC "-//SPRING//DTD BEAN//EN"

    "http://www.springframework.org/dtd/spring-beans.dtd">  

 

<beans>

    <bean id="jmsConnectionFactory"

 class="org.springframework.jndi.JndiObjectFactoryBean">

 <property name="jndiName">

     <value>java:comp/env/jms/factory </value>

 </property>

    </bean>

   

    <bean id="destination" class="org.springframework.jndi.JndiObjectFactoryBean">

 <property name="jndiName">

     <value> java:comp/env/jms/queue</value>

 </property>

    </bean>

   

    <!--  Spring JmsTemplate config -->

    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">

 <property name="connectionFactory">

     <bean

  class="org.springframework.jms.connection.SingleConnectionFactory">

  <property name="targetConnectionFactory"

      ref="jmsConnectionFactory"/>

     </bean>

 </property>

    </bean>

   

    <!-- POJO which send Message uses Spring JmsTemplate --> <!--配置消息生产者-->

    <bean id="messageProducer" class="com.focustech.jms.MessageProducer">

 <property name="template" ref="jmsTemplate"/>

 <property name="destination" ref="destination"/>

    </bean>

   

    <!--  Message Driven POJO (MDP) -->

    <bean id="messageListener" class=" com.focustech.jms.MessageConsumer"/>

   

    <!--  listener containerMDP无需实现接口 -->

    <bean id="listenerContainer"

 class="org.springframework.jms.listener.DefaultMessageListenerContainer">

 <property name="connectionFactory" ref="jmsConnectionFactory"/>

 <property name="destination" ref="destination"/>

 <property name="messageListener" ref="messageListener"/>

    </bean>

</beans>

其中:

1)   jmsConnectionFactorydestination都是使用自定义的,而且你会发现,这两个对象的加载类其实是一样的,都是JndiObjectFactoryBean,这是从JNDI读取连接的意思。

3)  MessageProducer是消息发送方。

4)  MessageConsumer实现了一个MessageListener,监听是否收到消息。

4、发送和接收消息的class如下(主要代码)

MessageProducer.java

public class MessageProducer {

    private JmsTemplate template;

    private Destination destination;

 

    public void send(final String message) {

 template.send(destination, new MessageCreator() {

     public Message createMessage(Session session) throws JMSException {

  Message m = session.createTextMessage(message);

  return m;

     }

 });

    }

 

    public void setDestination(Destination destination) {

 this.destination = destination;

    }

 

    public void setTemplate(JmsTemplate template) {

 this.template = template;

    }

 

}

 

MessageConsumer.java

public class MessageConsumer implements MessageListener {

 

    public void onMessage(Message message) {

        try

       {

           System.out.println(((TextMessage) message).getText());

       }

       catch (JMSException e)

       {

       }

    }

}

注:在上面的实例类中,由于在发送方发送的是文本消息(TextMessage),所以在上面的接收者代码中我直接将其转换成TextMessage就行了。如果是在真正的环境下,应该首先判断一下对方发送的是什么类型,然后才转换成对应的消息。

5、测试消息

为了测试的方便,可以在webroot下新建一个test.jsp,然后将下面的代码放到JSP的代码中,然后在网页地址栏中输入链接(如:http://oss.vemic.com/test.jsp 注:oss.vemic.com是本地服务器链接)就可以看到发送的消息了。

<%

try {

     ServletContext servletContext = this.getServletContext();

     WebApplicationContext wac = WebApplicationContextUtils

      .getRequiredWebApplicationContext(servletContext);

EN-U