RabbitMQ的使用
1.首先我从官方下载了rabbitmq-server
http://www.rabbitmq.com/download.html
当然公司还是以java开发为主,因此下载的版本是基于linux下面的,不过我再这里下载一个ubuntu版本的,因为我开发的机子就是ubuntu.
下载好后,安装很快,ubuntu下直接dpkg -i rabbitmq-server_3.5.0-1_all.deb
当然其他linux系统可以下载tar.gz包解压,但是还需要安装erlang.解压后可以执行以下命令就可以启动rabbitmq了
解压的目录sbin下找到rabbitmq-server运行即可,当然这里不需要详细说安装了,可以参考官方文档进行安装.
http://www.rabbitmq.com/install-generic-unix.html
当然,为了我们能可视化查看rabbitmq-server的运行情况和满足其他开发需求我们可以启用插件
rabbitmq-plugins enables,这样我们可以打开http://127.0.0.1:15672/
使用guest,guest登陆后可以看到http管理界面.
2.理解rabbitmq的工作原理
第一步安装好服务后,接下来我们就要进行生产消息和消费消息了.
但是要开发必须要理解rabbitmq的工作原理.当然如果我长篇大论记下来估计要记录几十页,因此如果英语比较好的可以先看看官方文档.
http://www.rabbitmq.com/getstarted.html
当然如果英语实在是渣的话,不妨参考下面的链接:
http://rabbitmq-into-chinese.readthedocs.org/zh_CN/latest/
看完这些文档之后,估计会对rabbitmq理解的差不多了.
当然网上也有很多人写的博客,都可以参照的.
3.java客户端开发
当然按照上述文档我们可以开发出自己需要的客户端,但是这里我主要介绍如何使用spring-amqp来进行开发.
首先我们需要引入spring-amqp包,为了能查看调试信息,我们也引入log4j
<dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>${spring-rabbit.version}</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency>
引入后就要进行开发了,当然spring-amqp如何配置这里不想长篇大论,因此这里只介绍两种发送方式
异步发送消息
其实这种方式是很容易的,大多数网上文章都是基于配置文件的,但是如果我们使用代码来创建,那么更加会理解spring-amqp的设计原理的,能使用代码创建也就更加能使用配置文件了
CachingConnectionFactory factory=new CachingConnectionFactory(); //创建链接工厂,使用带缓存功能的工厂可以基于Channel创建多个Channel来处理 //设置rabbitmq-server链接地址 factory.setAddresses("localhost:5672"); //设置登陆用户名,可以在rabbit-server http页面创建用户 factory.setUsername("guest"); //设置登陆密码(在本机上可以不用设置) factory.setPassword("guest"); //启动channel的个数并缓存 factory.setChannelCacheSize(channelSize); //创建admin进行队列绑定,exchange绑定 RabbitAdmin admin=new RabbitAdmin(factory); //创建名字为qingting-queue的队列,durable是否是持久化消息到服务端 Queue queue=new Queue("qingting-queue",durable); //此时会在rabbitmq-servers上创建一个queue admin.declareQueue(queue); //创建exchange并进行订阅到服务端,这里创建的是topic类型的exchange,也就是路由key可以是正则表达式 Exchange exchange=new TopicExchange("qingting-exchange",durable,autoDelete); //rabbitmq-server上会创建一个名称交exchangeName的exchange,同时类型为topic,而且特征有D表示durable admin.declareExchange(exchange); /** * 使用routeKey将队列绑定到exchange上面,这个时候,rabbitmq-server上的queue和exchange会有一个通过routeKey的映射关系 */ admin.declareBinding(BindingBuilder.bind(queue).to(exchange).with("qingting.*").noargs()); //创建一个Template来进行发消息,也可以收消息等操作 template = new RabbitTemplate(factory); //这里讨论发消息,我们通过exchange来进行发消息,同时设置routeKey,这时候因为是topic类型的exchange,通过 比对设置的routeKey跟上面绑定的bindRouteKey做正则匹配,如果匹配上了就会把消息推送到queueName的这个队列 template.setExchange("qingting-exchange"); template.setRoutingKey("qingting.route.key"); //设置将消息转换为json类型,当然自己可以设置其他的转换器或者自己实现 template.setMessageConverter(new Jackson2JsonMessageConverter());
有了上面配置后,那么我们可以进行发送同步消息了
template.convertAndSend();
因为上述设置了消息转换器,因此最好使用改方法,当然还有template.send()方法其实使用该方法确实结果Jackson2JsonMessageConverter将消息内容体处理成json并转换为字节数组,
同时还会将消息的属性配置中conentType设置为application/json 并且head中设置一些消息内容的对象类型等信息,之所以这样做是因为,消费者接收消息,也可以设置json转换器直接将拿接收的消息属性中head的值将消息体内容进行转换成对象,可以在rabbitmq管理页面看到发送的消息会附带如下信息
Exchange: qingting-exchange //上述template设置的exchange Routing Key: qingting.route.key //template设置的routeKey Properties headers: __KeyTypeId__:java.lang.Object //转json附带的信息,方便转换 __TypeId__:java.util.HashMap __ContentTypeId__:java.lang.Object content_encoding:UTF-8 content_type:application/json Payload {"username":"81a40c08-5a01-46fa-93c3-981fcccb3c8c","index":0}
因此可以看到http页面上在绑定的队列会有一条消息到绑定的队列,因为这时候没有消费者订阅队列,我们设置的是持久化durable,因此会一一直留在server中