ActiveMQ 使用说明

jopen 9年前

1介绍

ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现。

ActiveMQ最简单的形式就是如下图所示:

参与方有消息生产者、消息存储转发者、消费者

消息生产者:负责生产各种消息,发送到broker集群

消息存储转发者:broker集群,负责接收生产者生产的消息,向消费者提供消息

消费者:接收消息的一方

2 安装

activemq安装很简单,解压包至指定位置就可以,这里使用apache-activemq-5.10.0

Windowsapache-activemq-5.10.0-bin.zip

UNIX:apache-activemq-5.10.0-bin.tar.gz

3 启动

Windows:%ACTIVE_MQ_HOME%bin\win64\activemq.bat

UNIX:nohup  $ACTIVE_MQ_HOME/bin/activemq start < /tmp/smlog 2<&1 &

ActiveMQ默认使用的TCP连接端口是61616, 通过查看该端口的信息可以测试ActiveMQ是否成功启动

检查已经启动

查看61616端口是否打开: netstat -an | grep 61616

web管理界面http://10.21.210.43:8161/admin/默认账号密码:admin/admin


4 queue和topic

1、JMS Queue 执行 load balancer语义:

一条消息仅能被一个consumer(消费者)收到。如果在message发送的时候没有可用的consumer,那么它将被保存一直到能处理该 message的consumer可用。如果一个consumer收到一条message后却不响应它,那么这条消息将被转到另一个consumer那儿。一个Queue可以有很多consumer,并且在多个可用的consumer中负载均衡。


点对点消息传递域的特点如下:

A.每个消息只能有一个消费者。

B.消息的生产者和消费者之间没有时间上的相关性。无论消费者在生产者发送消息的时候是否处于运行状态,它都可以提取消息。


2、Topic 实现 publish和 subscribe 语义:

一条消息被publish时,它将发到所有感兴趣的订阅者,所以零到多个subscriber将接收到消息的一个拷贝。但是在消息代理接收到消息时,只有激活订阅的subscriber能够获得消息的一个拷贝。

发布/订阅消息传递域的特点如下:

A.每个消息可以有多个消费者。

B.生产者和消费者之间有时间上的相关性。订阅一个主题的消费者只能消费自它订阅之后发布的消息。JMS 规范允许客户创建持久订阅,这在一定程度上放松了时间上的相关性要求。持久订阅允许消费者消费它在未处于激活状态时发送的消息。


3、分别对应两种消息模式:

Point-to-Point (点对点),Publisher/Subscriber Model (发布/订阅者) 其中在 Publicher /Subscriber 模式下又有Nondurable subscription(非持久订阅)和 durable subscription (持久化订阅)2种消息处理方式(支持离线消息)。


A.在点对点消息传递域中,目的地被称为队列(queue);在发布/订阅消息传递域中,目的地被成为主题(topic)。

B.非持久化订阅:当consumer不在线时,broker(存储、转发消息部件)不会 为订阅者保存消息

C.持久化订阅:当consumer不在线时,broker(存储、转发消息部件)会为 订阅者保存消息,当consumer上线后,broker会把所有缓存的消息发送给该consumer即订阅者


4、非持久化传输和持久化传输

非持久化传输:当broker接收到provider生产的消息后,回复一个确认ACK,然后把消息保存在内存中,consumer从broker的内存中读取消息,这种方式吞吐量较高,缺点是当某个broker挂掉时,消息会丢失。如果消息积压过多,broker会把消息写到临时文件中,消费时从文件中读取消息,这样会降低吞吐量。Broker一旦重启,这些临时文件也会被删除。

持久化传输:当broker接收到provider生产的消息后,回复一个确认ACK,然后把消息保存在存储介质中,例如文件、MYSql、Oracle等。consumer消费消息时,从这些存储介质中读取,这种方式比较可靠,但是会降低吞吐量。

5 集群模式(amq的重要机制)

broker cluster + Networks of Brokers

让broker知道其他broker,客户端连接的一个broker挂了时,客户端自动连接到其他broker

#更改默认的conf/activemq.xml
#这里使用了静态发现,还有一种动态发现
"static:(tcp://10.21.210.43:61616,tcp://10.21.210.44:61616,tcp://10.21.210.45:61616)"/<
</div> </td> </tr> </tbody> </table> </div> </div> </div> </div>

消费同一个队列时,cosumer连接的broker挂掉,可以连接其他broker继续consume

#生产者和消费者使用failover来连接broker cluster
ConnectionFactory connectionFactory = newActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,ActiveMQConnection.DEFAULT_PASSWORD,
"failover:(tcp://10.21.210.43:61616,tcp://10.21.210.44:61616)");
</div> </td> </tr> </tbody> </table> </div> </div> </div> </div>

往队列produce时,客户端会连上failover中的其中一台传输数据,只有该broker挂掉,才会连failover中其他一台broker.

当集群中所有broker都挂掉后,客户端会阻塞等待,只要有broker能恢复过来,客户端就能自动连接上。