Spring整合activeMQ

jopen 9年前

项目要用到总线技术,今天研究了一下Spring编程的方法整合activeMQ:

1、注入连接工厂:

@Bean
public ActiveMQConnectionFactory activeMQConnectionFactory(){
    ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
    activeMQConnectionFactory.setBrokerURL(this.mqURL);
    return activeMQConnectionFactory;
}
注意此处的ActiveMQConnectionFactory是org.apache.activemq.spring包中的,而不是org.apache.activemq包中的类。

mqURL从外部配置文件中注入,形式如tcp://192.168.1.1:61616

2、注入消息目的地

@Bean
Topic alarmMessageTopic(){
    Topic topic = () -> DeviceAlarmMessage.alarm_topic;
    return topic;
}

此处我用的是订阅发布的模式,如果用点对点队列模式可以创建Queue。我用的J2EE的topic接口,用lambda表达式给接口的函数赋值,也可以用activeMQ的ActiveMQTopic和ActiveMQQueue。


3、注入JMS模板

@Bean
JmsTemplate alarmJmsTemplate(){
    JmsTemplate jmsTemplate = new JmsTemplate();
    jmsTemplate.setConnectionFactory(this.activeMQConnectionFactory());
    jmsTemplate.setDefaultDestination(this.alarmMessageTopic());
    return jmsTemplate;
}

指定了连接工厂和topic。下面会看到收消息用消息驱动POJO,所以JmsTemplate应该主要用于发消息,我现在的系统无发消息的需求,所以可以不注入JmsTemplate。

4、创建消息驱动POJO机制

4.1 首先创建并注入消息处理类:

public class DeviceAlarmHandler {
    public void processDeviceAlarm(DeviceAlarmMessage alarmMessage){
        System.out.println(alarmMessage.toString());
    }
}

这个类包含了一个方法,处理收到的DeviceAlarmMessage 对象。

@Bean
public DeviceAlarmHandler deviceAlarmHandler(){
    return new DeviceAlarmHandler();
}

4.2 注入MessageListenerAdapter

@Bean
public MessageListenerAdapter alarmMessageListener(){
    MessageListenerAdapter listener = new MessageListenerAdapter(this.deviceAlarmHandler());
    listener.setDefaultListenerMethod("processDeviceAlarm");
    return listener;
}
指定了消息处理类和消息处理方法

4.3 注入DefaultMessageListenerContainer

@Bean
public DefaultMessageListenerContainer alarmMessageListenerContainer(){
    DefaultMessageListenerContainer listenerContainer = new DefaultMessageListenerContainer();
    listenerContainer.setConnectionFactory(this.activeMQConnectionFactory());
    listenerContainer.setMessageListener(this.alarmMessageListener());
    listenerContainer.setDestination(this.alarmMessageTopic());
    return listenerContainer;
}
指定连接工厂、messageListenerAdapter和topic。

注:此处也可以不用Spring消息驱动POJO的机制,改用实现MessageListener接口的类。

5、上述已经配置完成。但启动web应用后,收消息的时候会报禁止序列化类DeviceAlarmMessage的错误。在Spring启动配置类(实现了WebApplicationInitializer接口的类)中加入属性:

System.setProperty("org.apache.activemq.SERIALIZABLE_PACKAGES", "com.sfauto.site.entities");

该属性表示可以序列化类的包。

如果不直接发送对象,即不用序列化,应不用加入此属性。

来自: http://my.oschina.net/u/2453016/blog/596940