ActiveMQ 使用示例
jopen
10年前
ActiveMQ 的使用非常简单,和JDBC一样,有标准的步骤:
1. 创建连接工厂
2. 创建连接
3. 创建会话
4. 创建目的地
5. 创建生产者或消费者
6. 生产或消费消息
7. 关闭生产或消费者、关闭会话、关闭连接
一个生产者例子如下:
class ActiveMQProducer implements Runnable { public void run() { try { // 创建连接工厂 ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); // 创建JMS连接实例,并启动连接 Connection connection = connectionFactory.createConnection(); connection.start(); // 创建Session对象,不开启事务 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 创建目标,可以是 Queue 或 Topic Destination destination = session.createQueue("ling.wcaccepted"); // 创建生成者 MessageProducer producer = session.createProducer(destination); // 设置消息不需持久化。默认消息需要持久化 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); // 创建文本消息 TextMessage message = session.createTextMessage("Hello World!"); // 发送消息。non-persistent 默认异步发送;persistent 默认同步发送 producer.send(message); // 关闭会话和连接 producer.close(); session.close(); connection.close(); } catch(Exception e) { e.printStackTrace(); } } }
一个消费者例子如下:
class ActiveMQConsumer implements Runnable { public void run() { try { // 创建连接工厂 ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://115.29.145.245:61616"); // 创建JMS连接实例,并启动连接 Connection connection = connectionFactory.createConnection(); connection.start(); // 创建Session对象,不开启事务 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 创建目标,可以是 Queue 或 Topic Destination destination = session.createQueue("ling.wcaccepted"); // 创建消费者 MessageConsumer consumer = session.createConsumer(destination); // 获取消息 System.out.println(consumer.receive()); // 关闭会话和连接 consumer.close(); session.close(); connection.close(); } catch(Exception e) { } } }
ActiveMQ 的 Session、MessageProducer 和 MessageConsumer 类是非线程安全的,不能在多线程中共享。
结合Spring使用
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:jms="http://www.springframework.org/schema/jms" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd"> <!-- ActiveMQ 连接池配置 --> <bean id="activemqPoolFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"> <property name="connectionFactory"> <bean class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL"> <value>tcp://localhost:61616</value> </property> </bean> </property> </bean> <!-- Spring JMS Template, 默认开启消息持久化 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate" c:connectionFactory-ref="activemqPoolFactory"/> <!-- ActiveMQ 消费者方法配置,下面配置生成2个listener --> <jms:listener-container connection-factory="activemqPoolFactory"> <jms:listener destination="ling.activity" ref="activemqClient" method="lingActivity"/> <jms:listener destination="ling.delete" ref="activemqClient" method="lingDelete"/> </jms:listener-container> <!-- ActiveMQ 消费者工具类配置 --> <bean id="activemqClient" class="com.winhong.ling.utils.ActivemqClient"/> </beans>
ActiveMQ 消费者工具类代码:
import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import com.alibaba.fastjson.JSON; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; /** * 消息队列工具类。 * * @author ningjh * @version 1.0 * @since 2015/1/4 */ public class ActivemqClient { public final static String QUEUE_ACTIVITY = "ling.activity"; public final static String QUEUE_DELETE = "ling.delete"; @Autowired private JmsTemplate jmsTemplate; /** * 发送文本消息 * * @param destination 队列名称 * @param message 文本消息内容 */ public void sendMessage(String destination, final String message) { jmsTemplate.send(destination, new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createTextMessage(message); } }); } /** * 消费者监听器方法。监听队列 ling.activity * * @param message */ public void lingActivity(String message) { System.out.println("接收到消息-activity: " + message); } /** * 消费者监听器方法。监听队列 ling.delete * * @param message */ public void lingDelete(String message) { System.out.println("接收到消息-delete: " + message); } }