ActiveMQ简述

lizengkui 8年前
   <p> </p>    <h2>概述</h2>    <p>ActiveMQ是Apache所提供的一个开源的消息系统,完全采用 Java 来实现,因此,它能很好地支持J2EE提出的JMS(Java Message Service,即Java消息服务)规范。JMS是一组Java应用程序接口,它提供消息的创建、发送、读取等一系列服务。JMS提供了一组公共应用程序接口和响应的语法,类似于Java 数据库 的统一访问接口JDBC,它是一种与厂商无关的API,使得Java程序能够与不同厂商的消息组件很好地进行通信。</p>    <p>JMS支持两种消息发送和接收模型。一种称为P2P(Ponit to Point)模型,即采用点对点的方式发送消息。P2P模型是基于队列的,消息生产者发送消息到队列,消息消费者从队列中接收消息,队列的存在使得消息的异步传输称为可能,P2P模型在点对点的情况下进行消息传递时采用。</p>    <p><img src="https://simg.open-open.com/show/69ba35a5cabf03f8068b64a20661cee4.png"></p>    <p>另一种称为Pub/Sub(Publish/Subscribe,即发布-订阅)模型,发布-订阅模型定义了如何向一个内容节点发布和订阅消息,这个内容节点称为topic(主题)。主题可以认为是消息传递的中介,消息发布这将消息发布到某个主题,而消息订阅者则从主题订阅消息。主题使得消息的订阅者与消息的发布者互相保持独立,不需要进行接触即可保证消息的传递,发布-订阅模型在消息的一对多广播时采用。</p>    <p><img src="https://simg.open-open.com/show/88c5c4c82535b18704f3a63fd1513a90.png"></p>    <h2>ActiveMQ的安装</h2>    <p>下载最新的安装包apache-activemq-5.13.2-bin.tar.gz(此包linux下的,案例也是针对linux系统进行阐述,当然ActiveMQ也有win版的,这里就不赘述了),可以去 <a href="/misc/goto?guid=4958323481424978975" rel="nofollow,noindex">官网</a> 下载,也可以在下方留言区留下你的邮箱,博主会发给你的~</p>    <p>下载之后解压: tar -zvxf apache-activemq-5.13.2-bin.tar.gz</p>    <p>ActiveMQ目录内容有:</p>    <ul>     <li>bin目录包含ActiveMQ的启动脚本</li>     <li>conf目录包含ActiveMQ的所有配置文件</li>     <li>data目录包含日志文件和持久性消息数据</li>     <li>example: ActiveMQ的示例</li>     <li>lib: ActiveMQ运行所需要的lib</li>     <li>webapps: ActiveMQ的web控制台和一些相关的demo</li>    </ul>    <p>运行命令: <strong>activemq start</strong> (在activemq/bin下运行)</p>    <pre>  <code>INFO: Loading '/users/shr/apache-activemq-5.13.2//bin/env'  INFO: Using java '/users/shr/util/JavaDir/jdk/bin/java'  INFO: Starting - inspect logfiles specified in logging.properties and log4j.properties to get details  INFO: pidfile created : '/users/shr/apache-activemq-5.13.2//data/activemq.pid' (pid '986')</code></pre>    <p>查看activemq是否运行命令: <strong>ps -aux | grep activemq</strong></p>    <pre>  <code>shr        986  1.2  9.7 1281720 201936 pts/5  Sl   19:43   0:17 /users/shr/util/JavaDir/jdk/bin/java -Xms64M -Xmx1G -Djava.util.logging.config.file=logging.properties -Djava.security.auth.login.config=/users/shr/apache-activemq-5.13.2//conf/login.config -Dcom.sun.management.jmxremote -Djava.awt.headless=true -Djava.io.tmpdir=/users/shr/apache-activemq-5.13.2//tmp -Dactivemq.classpath=/users/shr/apache-activemq-5.13.2//conf:/users/shr/apache-activemq-5.13.2//../lib/: -Dactivemq.home=/users/shr/apache-activemq-5.13.2/ -Dactivemq.base=/users/shr/apache-activemq-5.13.2/ -Dactivemq.conf=/users/shr/apache-activemq-5.13.2//conf -Dactivemq.data=/users/shr/apache-activemq-5.13.2//data -jar /users/shr/apache-activemq-5.13.2//bin/activemq.jar start  shr       1501  0.0  0.0   5176   724 pts/5    S+   20:06   0:00 grep activemq</code></pre>    <p>关闭命令: <strong>activemq stop</strong></p>    <pre>  <code>INFO: Loading '/users/shr/apache-activemq-5.13.2//bin/env'  INFO: Using java '/users/shr/util/JavaDir/jdk/bin/java'  INFO: Waiting at least 30 seconds for regular process termination of pid '986' :  Java Runtime: Oracle Corporation 1.7.0_79 /users/shr/util/JavaDir/jdk1.7.0_79/jre    Heap sizes: current=63232k  free=62218k  max=932096k      JVM args: -Xms64M -Xmx1G -Djava.util.logging.config.file=logging.properties -Djava.security.auth.login.config=/users/shr/apache-activemq-5.13.2//conf/login.config -Dactivemq.classpath=/users/shr/apache-activemq-5.13.2//conf:/users/shr/apache-activemq-5.13.2//../lib/: -Dactivemq.home=/users/shr/apache-activemq-5.13.2/ -Dactivemq.base=/users/shr/apache-activemq-5.13.2/ -Dactivemq.conf=/users/shr/apache-activemq-5.13.2//conf -Dactivemq.data=/users/shr/apache-activemq-5.13.2//data  Extensions classpath:    [/users/shr/apache-activemq-5.13.2/lib,/users/shr/apache-activemq-5.13.2/lib/camel,/users/shr/apache-activemq-5.13.2/lib/optional,/users/shr/apache-activemq-5.13.2/lib/web,/users/shr/apache-activemq-5.13.2/lib/extra]  ACTIVEMQ_HOME: /users/shr/apache-activemq-5.13.2  ACTIVEMQ_BASE: /users/shr/apache-activemq-5.13.2  ACTIVEMQ_CONF: /users/shr/apache-activemq-5.13.2/conf  ACTIVEMQ_DATA: /users/shr/apache-activemq-5.13.2/data  Connecting to pid: 986  ..Stopping broker: localhost  .. TERMINATED</code></pre>    <p>ActiveMQ的默认服务端口为61616,这个可以在conf/activemq.xml配置文件中修改:</p>    <pre>  <code class="language-xml"><transportConnectors>      <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->      <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>  </transportConnectors></code></pre>    <p>在下载的apache-activemq-5.13.2-bin.tar.gz包中解压有一个jar包:activemq-all-5.13.2.jar,引入这个jar到你的项目中即可开始编写案例代码。</p>    <p>博主的activemq服务器地址为10.10.195.187,这个在下面代码中会有体现。</p>    <p>按照JMS的规范,我们首先需要获得一个JMS connection factory.,通过这个connection factory来创建connection.在这个基础之上我们再创建session, destination, producer和consumer。因此主要的几个步骤如下:</p>    <ol>     <li>获得JMS connection factory. 通过我们提供特定环境的连接信息来构造factory。</li>     <li>利用factory构造JMS connection</li>     <li>启动connection</li>     <li>通过connection创建JMS session.</li>     <li>指定JMS destination.</li>     <li>创建JMS producer或者创建JMS message并提供destination.</li>     <li>创建JMS consumer或注册JMS message listener.</li>     <li>发送和接收JMS message.</li>     <li>关闭所有JMS资源,包括connection, session, producer, consumer等。</li>    </ol>    <p>下面来看代码举例(P2P式)。通过Java实现的基于ActiveMQ的请求提交:</p>    <pre>  <code class="language-java">package com.zzh.activemq;    import java.io.Serializable;  import java.util.HashMap;    import javax.jms.Connection;  import javax.jms.ConnectionFactory;  import javax.jms.DeliveryMode;  import javax.jms.Destination;  import javax.jms.MessageProducer;  import javax.jms.ObjectMessage;  import javax.jms.Session;    import org.apache.activemq.ActiveMQConnection;  import org.apache.activemq.ActiveMQConnectionFactory;    public class RequestSubmit  {      //消息发送者      private MessageProducer producer;      //一个发送或者接受消息的线程      private Session session;        public void init() throws Exception      {          //ConnectionFactory连接工厂,JMS用它创建连接          ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(                  ActiveMQConnection.DEFAULT_USER,                  ActiveMQConnection.DEFAULT_PASSWORD,                  "tcp://10.10.195.187:61616");          //Connection:JMS客户端到JMS Provider的连接,从构造工厂中得到连接对象          Connection connection = connectionFactory.createConnection();          //启动          connection.start();          //获取连接操作          session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);          Destination destinatin = session.createQueue("RequestQueue");          //得到消息生成(发送)者          producer = session.createProducer(destinatin);          //设置不持久化          producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);      }        public void submit(HashMap<Serializable,Serializable> requestParam) throws Exception      {          ObjectMessage message = session.createObjectMessage(requestParam);          producer.send(message);          session.commit();      }        public static void main(String[] args) throws Exception{          RequestSubmit submit = new RequestSubmit();          submit.init();          HashMap<Serializable,Serializable> requestParam = new HashMap<Serializable,Serializable>();          requestParam.put("朱小厮", "zzh");          submit.submit(requestParam);      }  }</code></pre>    <p>创建Session时有两个非常重要的参数,第一个boolean类型的参数用来表示是否采用事务消息。如果是事务消息,对于的参数设置为true,此时消息的提交自动有comit处理,消息的回滚则自动由rollback处理。加入消息不是事务的,则对应的该参数设置为false,此时分为三种情况:</p>    <ul>     <li>Session.AUTO_ACKNOWLEDGE表示Session会自动确认所接收到的消息。</li>     <li>Session.CLIENT_ACKNOWLEDGE表示由客户端程序通过调用消息的确认方法来确认所接收到的消息。</li>     <li>Session.DUPS_OK_ACKNOWLEDGE使得Session将“懒惰”地确认消息,即不会立即确认消息,这样有可能导致消息重复投递。</li>    </ul>    <p>提供Java实现的基于ActiveMQ的请求处理:</p>    <pre>  <code class="language-java">package com.zzh.activemq;    import java.io.Serializable;  import java.util.HashMap;  import java.util.Map;    import javax.jms.Connection;  import javax.jms.ConnectionFactory;  import javax.jms.Destination;  import javax.jms.MessageConsumer;  import javax.jms.ObjectMessage;  import javax.jms.Session;    import org.apache.activemq.ActiveMQConnection;  import org.apache.activemq.ActiveMQConnectionFactory;    public class RequestProcessor  {      public void requestHandler(HashMap<Serializable,Serializable> requestParam) throws Exception      {          System.out.println("requestHandler....."+requestParam.toString());          for(Map.Entry<Serializable, Serializable> entry : requestParam.entrySet())          {              System.out.println(entry.getKey()+":"+entry.getValue());          }      }        public static void main(String[] args) throws Exception      {          ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(                  ActiveMQConnection.DEFAULT_USER,                  ActiveMQConnection.DEFAULT_PASSWORD,                  "tcp://10.10.195.187:61616");          Connection connection = connectionFactory.createConnection();          connection.start();          Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);          Destination destination = session.createQueue("RequestQueue");          //消息消费(接收)者          MessageConsumer consumer = session.createConsumer(destination);            RequestProcessor processor = new RequestProcessor();            while(true)          {              ObjectMessage message = (ObjectMessage) consumer.receive(1000);              if(null != message)              {                  System.out.println(message);                  HashMap<Serializable,Serializable> requestParam = (HashMap<Serializable,Serializable>) message.getObject();                  processor.requestHandler(requestParam);              }              else              {                  break;              }          }      }  }</code></pre>    <p>输出结果:</p>    <pre>  <code>ActiveMQObjectMessage {commandId = 6, responseRequired = false, messageId = ID:hidden-PC-58748-1460550507055-1:1:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:hidden-PC-58748-1460550507055-1:1:1:1, destination = queue://RequestQueue, transactionId = TX:ID:hidden-PC-58748-1460550507055-1:1:1, expiration = 0, timestamp = 1460550507333, arrival = 0, brokerInTime = 1460550505969, brokerOutTime = 1460550509143, correlationId = null, replyTo = null, persistent = false, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@74a456bb, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false}  requestHandler.....{朱小厮=zzh}  朱小厮:zzh</code></pre>    <p>可以通过页面查看队列的使用情况,在浏览器中输入 <a href="/misc/goto?guid=4959673147068340717" rel="nofollow,noindex">http://10.10.195.187:8161/admin/queues.jsp</a> ,用户名和密码都是:admin,看到以下页面:</p>    <p><img src="https://simg.open-open.com/show/2b9dab18c15489376574004e60dd6ebc.png"></p>    <p>这个是在jetty服务器下跑的,可以修改conf/jetty.xml来修改相关jetty配置。</p>    <p>上面的例子是关于P2P模式的,不过有个不妥之处,就是没有资源的释放。下面举一个Pub/Sub模式的。通过JMS创建ActiveMQ的topic,并给topic发送消息:</p>    <pre>  <code class="language-java">import javax.jms.Connection;  import javax.jms.ConnectionFactory;  import javax.jms.DeliveryMode;  import javax.jms.JMSException;  import javax.jms.MessageProducer;  import javax.jms.ObjectMessage;  import javax.jms.Session;  import javax.jms.TextMessage;  import javax.jms.Topic;    import org.apache.activemq.ActiveMQConnection;  import org.apache.activemq.ActiveMQConnectionFactory;  import org.apache.camel.Produce;    public class TopicRequest  {      //消息发送者      private MessageProducer producer;      //一个发送或者接受消息的线程      private Session session;      //Connection:JMS客户端到JMS Provider的连接      private Connection connection;        public void init() throws Exception      {          //ConnectionFactory连接工厂,JMS用它创建连接          ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(                  ActiveMQConnection.DEFAULT_USER,                  ActiveMQConnection.DEFAULT_PASSWORD,                  "tcp://10.10.195.187:61616");          //从构造工厂中得到连接对象          connection = connectionFactory.createConnection();          //启动          connection.start();          //获取连接操作          session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);          Topic topic = session.createTopic("MessageTopic");          producer = session.createProducer(topic);          //设置不持久化          producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);      }        public void submit(String mess) throws Exception      {          TextMessage message = session.createTextMessage();          message.setText(mess);          producer.send(message);      }        public void close()      {          try          {              if(session != null)                  session.close();              if(producer != null)                  producer.close();              if(connection !=null )                  connection.close();          }          catch (JMSException e)          {              e.printStackTrace();          }      }        public static void main(String[] args) throws Exception      {          TopicRequest topicRequest = new TopicRequest();          topicRequest.init();          topicRequest.submit("I'm first");          topicRequest.close();      }  }</code></pre>    <p>消息发送到对应的topic后,需要将listener注册到需要订阅的topic上,以便能够接收该topic的消息:</p>    <pre>  <code class="language-java">import javax.jms.Connection;  import javax.jms.ConnectionFactory;  import javax.jms.JMSException;  import javax.jms.Message;  import javax.jms.MessageConsumer;  import javax.jms.MessageListener;  import javax.jms.Session;  import javax.jms.TextMessage;  import javax.jms.Topic;    import org.apache.activemq.ActiveMQConnection;  import org.apache.activemq.ActiveMQConnectionFactory;    public class TopicReceive  {      private MessageConsumer consumer;      private Session session;        public void init() throws Exception      {          ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(                  ActiveMQConnection.DEFAULT_USER,                  ActiveMQConnection.DEFAULT_PASSWORD,                  "tcp://10.10.195.187:61616");          Connection connection = connectionFactory.createConnection();          connection.start();          session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);          Topic topic = session.createTopic("MessageTopic");          consumer = session.createConsumer(topic);            consumer.setMessageListener(new MessageListener(){              @Override              public void onMessage(Message message)              {                  TextMessage tm = (TextMessage) message;                  System.out.println(tm);                  try                  {                      System.out.println(tm.getText());                  }                  catch (JMSException e)                  {                      e.printStackTrace();                  }              }          });      }        public static void main(String[] args) throws Exception      {          TopicReceive receive = new TopicReceive();          receive.init();      }  }</code></pre>    <p>输出结果:</p>    <pre>  <code>ActiveMQTextMessage {commandId = 5, responseRequired = false, messageId = ID:hidden-PC-50073-1460597487065-1:1:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:hidden-PC-50073-1460597487065-1:1:1:1, destination = topic://MessageTopic, transactionId = null, expiration = 0, timestamp = 1460597487308, arrival = 0, brokerInTime = 1460597487297, brokerOutTime = 1460597487298, correlationId = null, replyTo = null, persistent = false, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@2e4d3abf, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = I'm first}  I'm first</code></pre>    <p>参考文献</p>    <p>1. 《大型分布式网站 <a href="/misc/goto?guid=4959673147154813799" rel="nofollow,noindex">架构</a> ——设计与实践》陈康贤著。</p>    <p>2.  <a href="/misc/goto?guid=4958323481424978975" rel="nofollow,noindex">http://activemq.apache.org/</a></p>    <p> </p>    <p>via: http://www.importnew.com/19690.html</p>