JAVA中利用ActiveMQ收发消息
dh_sue 10年前
ActiveMQ不多说,下现是开启线程进行消息监听,符合条件则发送相应的消息到对方,两者消息均采用队列模式
SMSMQListener.java
/** * */ package com.wxcm.sms; import java.util.HashMap; import java.util.List; import java.util.Map; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.log4j.Logger; import org.springframework.beans.factory.annotation.Autowired; import com.wxcm.sms.service.ChannelService; import com.wxcm.sms.util.MQUtil; import com.wxcm.sms.vo.Channel; import com.wxcm.waf.Configur; /** * @author D.H. Sue * */ public class SMSMQListener implements Runnable { Logger logger = Logger.getLogger(SMSMQListener.class); @Autowired ChannelService channelService; @Autowired Configur configur; Connection connection; Session session; public SMSMQListener() { } public void run() { logger.info("--ActiveMQ Connect--"); ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(" + configur.getProperties().get("sms.mq.url") + ")"); logger.info("--ActiveMQ Connect Success--"); try { connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("stream"); MessageConsumer consumer = session.createConsumer(destination); while (true) { consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { MapMessage mm = (MapMessage) message; try { logger.info("--Msg From GET_ALL_CHANNELS--"); if (mm.getString("type").equals("get_all_channels")) { sendChannelInfo2StreamServerI(mm.getString("queue_name"), Constants.SEND_CHANNEL_2_MQ_ADD); logger.info("--Send Channel Info Done--"); } } catch (JMSException e) { logger.info(e); } } }); } } catch (JMSException e) { logger.info(e); } } private void sendChannelInfo2StreamServerI(String streamServerI, String opType){ try { List<Channel> channelList = (List<Channel>) channelService.listChannels(); Map<String, String> msgMap = new HashMap<String, String>(); if (channelList!=null) { msgMap.put("count", channelList.size()+""); for (int i = 0; i < channelList.size(); i++) { msgMap.put("type_" + i, Constants.SEND_CHANNEL_2_MQ_ADD); msgMap.put("channel_num_" + i, channelList.get(i).getChannelnumber() + ""); msgMap.put("srcurl_" + i, channelList.get(i).getSrcurl()); } MQUtil mqUtil = new MQUtil(configur.getProperties().get("sms.mq.url").toString()); Connection connection = mqUtil.connectMQ(); mqUtil.sendMessage(connection, msgMap, streamServerI); mqUtil.disconnectMQ(connection); } } catch (Exception e) { e.printStackTrace(); } } public void init() throws Exception { Thread thread = new Thread(this); thread.setName("listening"); thread.start(); } public void destroy(){ try { if (session!=null) { session.close(); } } catch (Exception e) { logger.error(e); } try { if (connection!=null) { connection.stop(); connection.close(); } } catch (Exception e) { logger.error(e); } } }下面是MQUtil.java工具类:
package com.wxcm.sms.util; import java.util.Iterator; import java.util.List; import java.util.Map; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.log4j.Logger; /** * ActiveMQ工具类 * @author D.H. Sue * */ public class MQUtil { Logger logger = Logger.getLogger(MQUtil.class); private String url; public String getUrl() { return url; } public void setUrl(String url) { this.url = url; } public MQUtil(String url){ this.url = url; } /** * 连接到ActiveMQ服务器 * @param isProducer * 是否为生产者1表示生产者,0表示消费者 * @return * 不空表示连接成功,空表示连接失败 */ public Connection connectMQ() { ConnectionFactory connectionFactory; Connection connection = null; connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, url); try { connection = connectionFactory.createConnection(); connection.start(); } catch (Exception e) { logger.error(e); } return connection; } /** * 断开连接 * @param connection * ActiveMQ连接 */ public void disconnectMQ(Connection connection){ try { if (connection != null) { connection.stop(); connection.close(); } } catch (Exception e) { logger.error(e.getMessage()); } } /** * 将消息message以topicName为主题发送出去 * @param key * 消息名 * @param message * 消息体 * @param topicName * 主题名称 * @throws Exception */ public void sendMessage(Connection connection, List<String> key, List<String> message, String topicName) { Session session = null; MessageProducer producer; try { session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic(topicName); producer = session.createProducer(topic); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); MapMessage mapMessage = session.createMapMessage(); Iterator<String> keyIt = key.iterator(); Iterator<String> messageIt = message.iterator(); while (keyIt.hasNext() && messageIt.hasNext()) { mapMessage.setString(keyIt.next(), messageIt.next()); } producer.send(mapMessage); session.commit(); } catch (Exception e) { logger.error(e.getMessage()); } finally { try { if (session != null) { session.close(); } } catch (JMSException e) { logger.error(e); } } } public void sendMessage(Connection connection, Map<String, String> msgMap, String streamServerI) { Session session; MessageProducer producer; try { session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue(streamServerI); producer = session.createProducer(queue); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); MapMessage mapMessage = session.createMapMessage(); for (Map.Entry<String, String> entity : msgMap.entrySet()) { mapMessage.setString(entity.getKey(), entity.getValue()); } producer.send(mapMessage); session.commit(); } catch (Exception e) { logger.error(e); } } }需要注意一点的是,使用线程来监听MQ消息时,一定要记得断开会话与链接,否则会遗留多个消费者,从而干扰程序的正确运行,因为是启动线程,因此需要在配置文件中配置线程的启动方法与销毁方法:
<bean id="SMSMQListener" class="com.wxcm.sms.SMSMQListener" scope="singleton" lazy-init="false" init-method="init" destroy-method="destroy"/>