RabbitMQ的使用

jopen 10年前

1.首先我从官方下载了rabbitmq-server

http://www.rabbitmq.com/download.html

当然公司还是以java开发为主,因此下载的版本是基于linux下面的,不过我再这里下载一个ubuntu版本的,因为我开发的机子就是ubuntu.

下载好后,安装很快,ubuntu下直接dpkg -i rabbitmq-server_3.5.0-1_all.deb

当然其他linux系统可以下载tar.gz包解压,但是还需要安装erlang.解压后可以执行以下命令就可以启动rabbitmq了

解压的目录sbin下找到rabbitmq-server运行即可,当然这里不需要详细说安装了,可以参考官方文档进行安装.

http://www.rabbitmq.com/install-generic-unix.html

当然,为了我们能可视化查看rabbitmq-server的运行情况和满足其他开发需求我们可以启用插件

rabbitmq-plugins enables,这样我们可以打开http://127.0.0.1:15672/

使用guest,guest登陆后可以看到http管理界面.

2.理解rabbitmq的工作原理

第一步安装好服务后,接下来我们就要进行生产消息和消费消息了.

但是要开发必须要理解rabbitmq的工作原理.当然如果我长篇大论记下来估计要记录几十页,因此如果英语比较好的可以先看看官方文档.

http://www.rabbitmq.com/getstarted.html

当然如果英语实在是渣的话,不妨参考下面的链接:

http://rabbitmq-into-chinese.readthedocs.org/zh_CN/latest/

看完这些文档之后,估计会对rabbitmq理解的差不多了.

当然网上也有很多人写的博客,都可以参照的.

3.java客户端开发

当然按照上述文档我们可以开发出自己需要的客户端,但是这里我主要介绍如何使用spring-amqp来进行开发.

首先我们需要引入spring-amqp包,为了能查看调试信息,我们也引入log4j

<dependency>      <groupId>org.springframework.amqp</groupId>      <artifactId>spring-rabbit</artifactId>      <version>${spring-rabbit.version}</version>  </dependency>  <dependency>      <groupId>log4j</groupId>      <artifactId>log4j</artifactId>      <version>1.2.17</version>  </dependency>

引入后就要进行开发了,当然spring-amqp如何配置这里不想长篇大论,因此这里只介绍两种发送方式

异步发送消息

其实这种方式是很容易的,大多数网上文章都是基于配置文件的,但是如果我们使用代码来创建,那么更加会理解spring-amqp的设计原理的,能使用代码创建也就更加能使用配置文件了

CachingConnectionFactory factory=new CachingConnectionFactory(); //创建链接工厂,使用带缓存功能的工厂可以基于Channel创建多个Channel来处理    //设置rabbitmq-server链接地址  factory.setAddresses("localhost:5672");    //设置登陆用户名,可以在rabbit-server http页面创建用户  factory.setUsername("guest");    //设置登陆密码(在本机上可以不用设置)  factory.setPassword("guest");    //启动channel的个数并缓存  factory.setChannelCacheSize(channelSize); //创建admin进行队列绑定,exchange绑定 RabbitAdmin admin=new RabbitAdmin(factory);    //创建名字为qingting-queue的队列,durable是否是持久化消息到服务端  Queue queue=new Queue("qingting-queue",durable);    //此时会在rabbitmq-servers上创建一个queue  admin.declareQueue(queue);    //创建exchange并进行订阅到服务端,这里创建的是topic类型的exchange,也就是路由key可以是正则表达式  Exchange exchange=new TopicExchange("qingting-exchange",durable,autoDelete);    //rabbitmq-server上会创建一个名称交exchangeName的exchange,同时类型为topic,而且特征有D表示durable  admin.declareExchange(exchange); /**  * 使用routeKey将队列绑定到exchange上面,这个时候,rabbitmq-server上的queue和exchange会有一个通过routeKey的映射关系  */ admin.declareBinding(BindingBuilder.bind(queue).to(exchange).with("qingting.*").noargs());    //创建一个Template来进行发消息,也可以收消息等操作  template = new RabbitTemplate(factory);    //这里讨论发消息,我们通过exchange来进行发消息,同时设置routeKey,这时候因为是topic类型的exchange,通过  比对设置的routeKey跟上面绑定的bindRouteKey做正则匹配,如果匹配上了就会把消息推送到queueName的这个队列  template.setExchange("qingting-exchange");  template.setRoutingKey("qingting.route.key");    //设置将消息转换为json类型,当然自己可以设置其他的转换器或者自己实现  template.setMessageConverter(new Jackson2JsonMessageConverter()); 

有了上面配置后,那么我们可以进行发送同步消息了

template.convertAndSend();

因为上述设置了消息转换器,因此最好使用改方法,当然还有template.send()方法其实使用该方法确实结果Jackson2JsonMessageConverter将消息内容体处理成json并转换为字节数组,

同时还会将消息的属性配置中conentType设置为application/json 并且head中设置一些消息内容的对象类型等信息,之所以这样做是因为,消费者接收消息,也可以设置json转换器直接将拿接收的消息属性中head的值将消息体内容进行转换成对象,可以在rabbitmq管理页面看到发送的消息会附带如下信息

Exchange:        qingting-exchange  //上述template设置的exchange  Routing Key:    qingting.route.key       //template设置的routeKey  Properties      headers:          __KeyTypeId__:java.lang.Object     //转json附带的信息,方便转换          __TypeId__:java.util.HashMap          __ContentTypeId__:java.lang.Object      content_encoding:UTF-8      content_type:application/json         Payload      {"username":"81a40c08-5a01-46fa-93c3-981fcccb3c8c","index":0}

    因此可以看到http页面上在绑定的队列会有一条消息到绑定的队列,因为这时候没有消费者订阅队列,我们设置的是持久化durable,因此会一一直留在server中