activemq实例

kongna 9年前

来自: http://my.oschina.net/goudingcheng/blog/608872


ACTIVEMQ入门实例
1.下载版本版本:apache-activemq-5.4.1
2.解压
运行apache-activemq-5.4.1\bin下的activemq
Failed to start ActiveMQ JMS Message Broker. Reason: java.io.EOFException: Chunk stream does not exist
若出现以上情况:就参考这个文章http://www.cnblogs.com/kaka/archive/2012/03/15/2398215.html在conf下的activemq。xml里面<broker
添加 schedulerSupport="false"
3.创建一个queen
package com.activemq.jmsApplication01;

import java.util.Properties;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.naming.InitialContext;
import javax.naming.NamingException;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.omg.CORBA.Context;
/*
 * Sending a jms MESSAGE
 * author paohaijiao
 * date 2016/01/18
 *  
 *
 * */
public class MyMessageProducer {
    static boolean useTransation = false;
    static ConnectionFactory connectionFactory;
    static Connection connection;
    static Session session;
    static Destination destination;
    static MessageProducer producer;
    static Message message;

    public static void init() {

        try {
            /*
            
            InitialContext ctx = new InitialContext();// 1.acquire a jms
                                                        // connection factory
            connectionFactory = (ConnectionFactory) ctx
                    .lookup("ConnectionFactoryName");*/
             connectionFactory = new ActiveMQConnectionFactory(
                        ActiveMQConnection.DEFAULT_USER,
                        ActiveMQConnection.DEFAULT_PASSWORD,
                        "tcp://localhost:61616");
            connection = connectionFactory.createConnection();// 2 create a jms
                                                                // using the
                                                                // connection
                                                                // factory
            connection.start();// 3.start connection
            session = connection.createSession(useTransation,
                    Session.AUTO_ACKNOWLEDGE);// 4.create jms session from a
                                                // connection
            destination = session.createQueue("TEST.QUEUE");// 5,acquire jms
                                                            // destination
            producer = session.createProducer(destination);// 6create jms
                                                            // producer,or
                                                            // create message
                                                            // and addr to a
                                                            // destination
            // 7.create jms consumer or registera jms message listerner
            message = session.createTextMessage("this is text");//
            producer.send(message);// 8.send /recieve jms message
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } finally {
            try {
                // 9.Release Resource
                producer.close();
                session.close();
                connection.close();
            } catch (JMSException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }

        }

    }
}
4.这时查看http://localhost:8161/admin/ 里面的queens就有记录了

Name Number Of Pending Messages   Number Of Consumers   Messages Enqueued   Messages Dequeued   Views   Operations  
example.A 0 1 0 0 BrowseActive Consumers
       
Send To    Purge    Delete
TEST.QUEUE 1 0 1 0 BrowseActive Consumers
       
Send To    Purge    Delete

 

Number Of Consumers  消费者 这个是消费者端的消费者数量

Number Of Pending Messages 等待消费的消息 这个是当前未出队列的数量。可以理解为总接收数-总出队列数
Messages Enqueued 进入队列的消息  进入队列的总数量,包括出队列的。 这个数量只增不减
Messages Dequeued 出了队列的消息  可以理解为是消费这消费掉的数

package com.activemq.jmsApplication01;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.InitialContext;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class MyAsyncMessageConsumer implements MessageListener {
    static boolean useTransation = false;
    static ConnectionFactory connectionFactory;
    static Connection connection;
    static Session session;
    static Destination destination;
    static MessageConsumer consumer;
    static Message message;

    public void RecieveMessage() {
        try {
        //    InitialContext ctx = new InitialContext();

            //connectionFactory = (ConnectionFactory) ctx.lookup("ConnectionFactoryName");
            
             connectionFactory = new ActiveMQConnectionFactory(
                        ActiveMQConnection.DEFAULT_USER,
                        ActiveMQConnection.DEFAULT_PASSWORD,
                        "tcp://localhost:61616");
             
            connection = connectionFactory.createConnection();
            connection.start();
            session = connection.createSession(useTransation,
                    Session.AUTO_ACKNOWLEDGE);
            destination = session.createQueue("TEST.QUEUE");
            consumer = session.createConsumer(destination);
            consumer.setMessageListener(this);

        } catch (Exception e) {

            e.printStackTrace();
        } finally {
            try {
                connection.close();
                session.close();
            } catch (JMSException e) {
                // TODO Auto-generated catch block                                                                               
                e.printStackTrace();
            }
        }
    }

    @Override
    public void onMessage(Message message) {
        if (message instanceof TextMessage) {
            System.out.println("Recieve message:" + message);

        }

    }

}若采用同步:代码如下

package com.activemq.jmsApplication01;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.InitialContext;
import javax.naming.NamingException;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class MySyncMessageConsumer {
    static boolean useTransation = false;
    static ConnectionFactory connectionFactory;
    static Connection connection;
    static Session session;
    static Destination destination;
    static MessageConsumer consumer;


    public void RecieveMessage() {

        try {
            /*InitialContext ctx = new InitialContext();
            connectionFactory = (ConnectionFactory) ctx
                    .lookup("ConnectionFactoryName");*/
             connectionFactory = new ActiveMQConnectionFactory(
                        ActiveMQConnection.DEFAULT_USER,
                        ActiveMQConnection.DEFAULT_PASSWORD,
                        "tcp://localhost:61616");
            connection = connectionFactory.createConnection();
            connection.start();
            session = connection.createSession(useTransation,
                    Session.AUTO_ACKNOWLEDGE);
            destination = session.createQueue("TEST.QUEUE");
            consumer = session.createConsumer(destination);
       
                System.out.println("init");

                
                Message message = consumer.receive(100000);
                while(true){
                    if (null ==message) {
                       System.out.println("收到消息" + ((TextMessage) message).getText());
                    } else{
                        continue;
                    }
               
                }
            

        } catch (Exception e) {

            e.printStackTrace();
        }finally{
            try {
                connection.close();
                session.close();
            } catch (JMSException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            
        }

    }
}

Name Number Of Pending Messages   Number Of Consumers   Messages Enqueued   Messages Dequeued   Views   Operations  
example.A 0 1 0 0 BrowseActive Consumers
       
Send To    Purge    Delete
TEST.QUEUE 0 1 1 1 BrowseActive Consumers
       
Send To    Purge    Delete

因为针对消费比较快的消费者,我们使用同步(可以减少异步发送消息时产生的上下文切换),针对消费比较慢的消费者,我们使用异步。 同步发送消息的缺点是,对于生产者发送的消息,如果消费者消费的比较慢,那么生产者就会被阻塞。

默认配置是异步发送 (displayatchAsync=true),这种配置也保证了MQ的高性能。