[RabbitMQ]队列
来自: http://www.cnblogs.com/w1991/p/5178701.html
RabbitMQ是一个message broker,本质上就是接收producer的消息,传递给consumer,其中可以根据给定的需要设置消息路由、缓存、持久化。
简单队列
最简单的队列如下图,producer将消息推送到queue中,queue将消息传递给consumer。可以有多个producer向同一个queue推送消息,也可以有多个consumer从queue接收消息。
用Java开发producer和consumer需要下载 Java client library 。
producer代码如下:
// Send.java import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Send { private final static String QUEUE_NAME = "SimpleQueue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } }
consumer代码如下:
// Recv.java import com.rabbitmq.client.*; import java.io.IOException; public class Recv { private final static String QUEUE_NAME = "SimpleQueue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); } }; channel.basicConsume(QUEUE_NAME, true, consumer); } }
编译并运行:
javac -cp rabbitmq-client.jar Send.java Recv.java java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Send java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Recv
运行结果如下:
任务队列(task quque)
任务队列是queue将消息分发给多个consumer进行处理。
// Task.java import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Task { private final static String QUEUE_NAME = "TaskQueue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = argv[0]; channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } }
// Worker.java import com.rabbitmq.client.*; import java.io.IOException; public class Worker { private final static String QUEUE_NAME = "TaskQueue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } finally { System.out.println(" [x] Done"); } } }; channel.basicConsume(QUEUE_NAME, true, consumer); } private static void doWork(String task) throws InterruptedException { try { Thread.sleep(1000); } catch (InterruptedException _ignored) { Thread.currentThread().interrupt(); } } }
编译代码:
javac -cp rabbitmq-client.jar Task.java Worker.java
分别打开三个console,其中两个运行Worker:
java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Worker
另一个console运行Task发送消息:
java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Task msg
运行结果如下:
使用任务队列可以实现并行任务处理。默认情况下,RabbitMQ会按消息的顺序依次分发给consumer,也就是说平均每个consumer会接收相同数量的消息。这种消息分发方式称为循环调度(round-robin dispatching)。
消息反馈(message acknowledgment)
如果consumer在执行某个消息的任务过程中失败了,会怎么样呢?在当前模式下,RabbitMQ一旦将消息发送给了consumer就从内存中删除该消息,这种情况下,如果杀掉了worker,那么分配给该worker的未处理的和正在处理中的消息就会丢失。但是我们肯定不希望任务丢失,我们希望一个worker挂掉之后,它未处理完成的任务将分配给其他worker处理。
为了确保消息不会丢失,RabbitMQ支持消息反馈。消息反馈就是consumer告诉RabbitMQ某个消息已经被接收并处理完成了,可以从队列中删除了。
如果一个consumer挂掉了(channel或connection被关闭,或者TCP连接丢失),没有发送反馈(ack),那么RabbitMQ就知道消息没有被处理完成,就把它重新放入队列。这时如果有其他consumer在线,那么就把这个消息重新分发。这样就确保在worker挂掉时消息也不会丢失了。
消息处理是没有超时机制的。RabbitMQ会在consumer挂掉时才重新分发消息。因此,即使消息处理时间很长很长都没有关系。
默认消息反馈是开启的。在上面的例子中通过指定 autoAck=true 关闭了反馈。消息反馈的代码如下:
final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } finally { System.out.println(" [x] Done"); channel.basicAck(envelope.getDeliveryTag(), false); } } };
PS:可以使用rabbitmqctl查看未反馈的消息。
rabbitmqctl list_queues name messages_ready messages_unacknowledged
消息持久化
之前的消息反馈可以确保consumer挂掉时消息不会丢失,但是如果RabbitMQ服务器停止或者崩溃了,队列和其中的消息就都丢失了。要确保服务器上的消息不丢失,需要将队列和消息都标识成持久化的。
首先,确保RabbitMQ不会丢失队列:
boolean durable = true; channel.queueDeclare("TaskQueue", durable, false, false, null);
尽管这代码是正确的,但是目前的环境下却不起作用。因为之前已经声明了一个非持久化的队列TaskQueue。RabbitMQ不允许用不同的参数重新定义已经存在的队列。换一个新的队列名称就行了。
boolean durable = true; channel.queueDeclare("DurableTaskQueue", durable, false, false, null);
这样RabbitMQ重启后队列DurableTaskQueue是仍然存在的。接下来需要将消息标识成持久化的,通过设置属性MessageProperties为PERSISTENT_TEXT_PLAIN:
import com.rabbitmq.client.MessageProperties; channel.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
PS:将消息标识成持久化不能完全保证消息不会丢失。RabbitMQ在接收到消息写入磁盘之间仍有很短的时间间隔,而且RabbitMQ不会每次保存消息时都调用fsync,因此可能消息只被写入了缓存而还没有写入磁盘。如果需要更强的持久化方案,可以使用publisher confirm。
公平分配(fair dispatch)
默认的循环调度方案可能会造成负载不平均。例如,有两个worker,偶数号的任务处理很耗时,而奇数号的任务处理很快,那么就会导致一个worker处理很慢,积压很多任务,而另一个worker处理很快,比较空闲。这是因为RabbitMQ在消息进入队列后就立刻分发出去,它不关心consumer中未反馈的消息数量,只是简单地将第n个消息分发给第n个consumer。
为了让每个consumer的负载均匀,通过basicQos方法设置参数prefetchCount为1。这样,consumer中的消息将不超过一条,当consumer处理完并发送反馈后RabbitMQ才会向它分发新的消息。RabbitMQ在接收到新消息后,如果有空闲的consumer就将消息发给它,否则就一直等待有空闲的consumer再分发。
int prefetchCount = 1; channel.basicQos(prefetchCount);
PS:如果所有的任务都很耗时,worker都很忙,那么可能会导致队列被塞满,这时需要增加worker,或者采用其他策略。
最后的完整代码如下:
// Task.java import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties; public class Task { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); String message = argv[0]; channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); }
// Worker.java import com.rabbitmq.client.*; import java.io.IOException; public class Worker { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); final Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); int prefetchCount = 1; channel.basicQos(prefetchCount); final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } finally { System.out.println(" [x] Done"); channel.basicAck(envelope.getDeliveryTag(), false); } } }; channel.basicConsume(TASK_QUEUE_NAME, false, consumer); } private static void doWork(String task) { try { Thread.sleep(1000); } catch (InterruptedException _ignored) { Thread.currentThread().interrupt(); } } }
参考资料:
</div>