AcitveMQ编程实践

13年前
本文主要介绍了ActiveMQ的编程模型,常用的类。以及一个通用的MQ编程模式。方便初学者快速掌握 ActiveMQ的编程方法。

一、             开发 JMS的步骤

一个 JMS 应用是几个 JMS 客户端交换消息,开发 JMS 客户端应用由以下几步构成:

1、用 JNDI 得到 ConnectionFactory 对象;

2、用 ConnectionFactory 创建 Connection 对象;

3、用 Connection 对象创建一个或多个 JMS Session

4、用 JNDI 得到目标队列或主题对象,即 Destination 对象;

5、用 Session Destination 创建 MessageProducer MessageConsumer

6、通知 Connection 开始传送消息。

 

二、             编程模型

1、  ConnectionFactory

要初始化 JMS,则需要使用连接工厂。客户端通过创建 ConnectionFactory建立到 ActveMQ的连接,一个连接工厂封装了一组连接配置参数,这组参数在配置ActiveMQ时已经定义,例如brokerURL参数,此参数传入的是ActiveMQ服务地址和端口,支持openwire协议的默认连接为 tcp://localhost:61616,支持 stomp协议的默认连接为 tcp://localhost:61613

ActiveMQConnectionFactory构造方法:

ActiveMQConnectionFactory();

ActiveMQConnectionFactory(String brokerURL);

ActiveMQConnectionFactory(String userName, String password, String b rokerURL) ;

ActiveMQConnectionFactory(String userName, String password, URI brok erURL) ;

ActiveMQConnectionFactory(URI brokerURL);

其中 brokerURLActiveMQ服务地址和端口。

例如:

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192. 168.0.135:61616");

或者

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();

connectionFactory. setBrokerURL("tcp://192.168.0.135:61616");

 

 

2Connection

在成功创建正确的ConnectionFactory后,下一步将是创建一个连接,它是JMS定义的一个接口。ConnectionFactory负责返回可以与底层消息传递系统进行通信的Connection实现。通常客户端只使用单一连接。根据JMS文档,Connection的目的是“利用 JMS提供者封装开放的连接”,以及表示“客户端与提供者服务例程之间的开放TCP /IP套接字”。该文档还指出 Connection应该是进行客户端身份验证的地方,除了其他一些事项外,客户端还可以指定惟一标志符。

当一个Connection被创建时,它的传输默认是关闭的,必须使用start方法开启。一个Connection可以建立一个或多个的Session。当一个程序执行完成后,必须关闭之前创建的Connection,否则 ActiveMQ不能释放资源,关闭一个Connection同样也关闭了 SessionMessageProducerMessageConsumer

Connection支持并发。

2.1、创建Connection

ActiveMQConnectionFactory方法:

Connection createConnection()

Connection createConnection(String userName, String password);

 

2.2、开启 Connection

void start();

如:connection.start();

 

2.3关闭 Connection

void close();

如:connection.close();

 

3、                Session

一旦从ConnectionFactory中获得一个Connection,就必须从Connection中创建一个或者多个SessionSession是一个发送或接收消息的线程,可以使用Session创建 MessageProducerMessageConsumerMessage

Session可以被事务化,也可以不被事务化。通常可以通过向Connection上的适当创建方法传递一个布尔参数对此进行设置。

Session createSession(boolean transacted, int acknowledgeMode);

其中transacted为使用事务标识,acknowledgeMode为签收模式。

如:Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

 

4、                Destination

Destination是一个客户端用来指定生产消息目标和消费消息来源的对象。

PTP模式中,Destination被称作Queue即队列;在Pub/Sub模式,Destination被称作Topic即主题。在程序中可以使用多个QueueTopic

Queue createQueue(String queueName);

TemporaryQueue createTemporaryQueue();

Topic createTopic(String topicName);

TemporaryTopic createTemporaryTopic();

如: Destination destination = session.createQueue("TEST.FOO");

 

5、                MessageProducer

MessageProducer是一个由Session创建的对象,用来向Destination发送消息。

5.1、创建MessageProducer

MessageProducer createProducer(Destination destination);

如:MessageProducer producer = session.createProducer(destination);

 

5.2 发送消息

void send(Destination destination, Message message);

void send(Destination destination, Message message, int deliveryMode, in tpriority, long timeToLive);

void send(Message message);

void send(Message message, int deliveryMode, int priority, long timeToLive);

其中deliveryMode为传送模式,priority为消息优先级,timeToLive为消息过期时间。

如:producer.send(message);

 

6、                MessageConsumer

MessageConsumer是一个由Session创建的对象,用来从Destination接收消息。

6.1、创建MessageConsumer

MessageConsumer createConsumer(Destination destination);

MessageConsumer createConsumer(Destination destination, String messageSelector);

MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal);

TopicSubscriber createDurableSubscriber(Topic topic, String name);

TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal);

其中messageSelector为消息选择器;noLocal标志默认为false,当设置为true时限制消费者只能接收和自己相同的连接(Connection)所发布的消息,此标志只适用于主题,不适用于队列;name标识订阅主题所对应的订阅名称,持久订阅时需要设置此参数。

如:MessageConsumer consumer = session.createConsumer(destination);

 

6.2、消息的同步与异步接收

消息的同步接收是指客户端主动去接收消息,客户端可以采用MessageConsumerreceive方法去接收下一个消息。

消息的异步接收是指当消息到达时,ActiveMQ主动通知客户端。客户端可以通过注册一个实现 MessageListener接口的对象到MessageConsumerMessageListener只有一个必须实现的方法onMessage,它只接收一个参数,即Message。在为每个发送到Destination的消息实现onMessage时,将调用该方法。

Message receive()

Message receive(long timeout)

Message receiveNoWait()

其中timeout为等待时间,单位为毫秒。

或者实现MessageListener接口,每当消息到达时,ActiveMQ会调用MessageListener中的

onMessage函数。

如:Message message = consumer.receive();

 

6.3、消息选择器

JMS提 供了一种机制,使用它,消息服务可根据消息选择器中的标准来执行消息过滤。生产者可在消息中放入应用程序特有的属性,而消费者可使用基于这些属性的选择标 准来表明对消息是否感兴趣。这就简化了客户端的工作,并避免了向不需要这些消息的消费者传送消息的开销。然而,它也使得处理选择标准的消息服务增加了一些 额外开销。

消息选择器是用于MessageConsumer的过滤器,可以用来过滤传入消息的属性和消息头部分(但不过滤消息体),并确定是否将实际消费该消息。按照JMS文档的说法,消息选择器是一些字符串,它们基于某种语法,而这种语法是SQL-92的子集。可以将消息选择器作为 MessageConsumer创建的一部分。

如:public final String SELECTOR = “JMSType = ‘TOPIC_PUBLISHER’”;

该选择器检查了传入消息的JMSType属性,并确定了这个属性的值是否等于 TOPIC _PUBLISHER。如果相等,则消息被消费;如果不相等,那么消息会被忽略。