[译]RabbitMQ系列教程(一):Hello World
来自: http://my.oschina.net/andylucc/blog/605746
RabbitMQ是一个消息中间件。基本思想非常简单:接收和转发消息。你可以把它想象成一个邮局:当我们往邮箱里面投递一个信件的时候,我们非常确信邮政快递员能够把我们的邮件送达到接受者的手中。使用这个隐喻,RabbitMQ是一个邮箱,是一个邮政局,是一个邮政快递员。
与传统的邮政局不同的是,RabbitMQ处理的不是纸质的邮件,而是二进制数据构成的消息。
关于RabbitMQ和消息的一些行话:
1,生产消息即发送,一个发送消息的进程叫Producer。我们用下面的图表示(一个被标记为P的圆圈):
2,queue代表接消息的邮箱,存在于RabbitMQ中。尽管消息在应用和RabbitMQ之间流动,但是他们只能被存储在queue之中。queue是没有任何限制的,它可以用来存储任何你想存储的消息,它本质上是一个无限制的buffer。多个producer可以向一个queue中发送消息。多个consumer可以从一个queeue中接收消息。队列可以用下面的图来描述:
3,consuming意味着接收,consumer是一个想要接收消息的进程,同样,我们可以用下面的图来表达consumer:
值得注意的是,producer、consumer和broker不一定是在同一台机器上,的确在通常的使用中很少有人在同一台机器上使用它。
Hello World!
(使用pika 0.10.0 Python client)
下面的例子不会太复杂——我们发送一个消息,接收它并把它打印在终端。为了完成这件小事,我们需要写两端小程序:一个是用来发送消息的,另一个是用来接收消息并打印的。
我们的设计大概可以用下面的图来表示:
producer生产消息发送至hello queue,consumer从hello queue中接收消息。
RabbitMQ libraries RabbitMQ是基于AMQP 0.9.1版本,它是一个开放的,用于处理消息的协议。现在已经有基于不同语言的多种版本的RabbitMQ客户端,我们将会使用Pika,它是RabbitMQ团队推荐的一个客户端,我们可以使用pip包管理工具来安装它。
发送消息:
我们的第一个小程序是send.py,我们将会向队列发送一个消息。这个时候我们需要做的第一件事情就是和RabbitMQ服务器建立连接。
#!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel()
通过上面的代码我们和本覅RabbitMQ的broker建立了一条物理连接,如果想要和不同的机器建立连接,这里只需要修改一下IP地址或域名即可。接下来,在发送消息之前,我们需要确保接收队列存在。如果我们将消息发送至一个不存在的队列,RabbitMQ会直接将消息丢弃。我们先创建一个叫“hello”的队列:
channel.queue_declare(queue='hello')
这个时候我们已经可以发送消息了,我们的第一个消息只包含一个字符串“Hello World”,我们将会把它发送至“hello”队列。
在RabbitMQ中,消息不是被直接送到队列的,而是首先被送到exchange中。我们目前不需要了解的那么详细,或者我们可以通过第三方的教程来学习有关exchanges的知识。我们现在所需要知道的是如何使用空字符串标识的默认exchange。这个exchange是比较特殊的,它使得我们可以指定消息应该被送往哪里。队列名字需要在routing_key参数中指定。
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')print(" [x] Sent 'Hello World!'")
在退出程序之前,我们需要确认网络缓冲区的内容被刷新,内容被发送出去,我们可以通过关闭连接来达到效果:
connection.close()
接收消息:
我们的第二个程序receive.py将会送队列中接收消息并且将消息的内容打印出来。因此,我们需要再次连接RabbitMQ服务器,相关代码和之前相同。下一步,和之前一样,我们必须确保队列存在。因此我们可以用queue_declare。对于queue_declare接口,我们可以多次调用它,而结果是只有一个队列会被创建。
channel.queue_declare(queue='hello')
你可能有点疑问,我们为什么要再一次declare队列,在之前的代码中,我们已经declare了呀。主要是因为我们一定要确认在发送消息前队列是存在的,否则消息将会被丢弃。
我们可以通过命令行来查看队列: $ sudo rabbitmqctl list_queues Listing queues ... hello 0 ...done.
从消息队列接收消息是相对复杂的一件事,我们通过给队列绑定一个回调函数,当我们接收到一条消息的时候,回调函数会被pika库调用,下面这个回调函数将会把消息打印出来。
def callback(ch, method, properties, body): print(" [x] Received %r" % body)
接下来,接下来我们将要告诉RabbitMQ这个回调函数将会从指定的“hello”队列来接收消息:
channel.basic_consume(callback, queue='hello', no_ack=True)
no_ack参数将会在后面介绍。
最后,我们进入了一个死循环,监听消息,当有消息的时候我们将消息打印出来,然后继续监听。
print(' [*] Waiting for messages. To exit press CTRL+C')channel.start_consuming()
下面的完整的代码:
send.py
#!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='hello') channel.basic_publish(exchange='', routing_key='hello', body='Hello World!') print(" [x] Sent 'Hello World!'") connection.close()
receive.py
#!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='hello') def callback(ch, method, properties, body): print(" [x] Received %r" % body) channel.basic_consume(callback, queue='hello', no_ack=True) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
现在我们可以在终端试运行我们的程序,首先,我们来向队列发送一条消息:
$ python send.py [x] Sent 'Hello World!'
然后来接收消息:
$ python receive.py [*] Waiting for messages. To exit press CTRL+C [x] Received 'Hello World!'
好了,例子到这里结束,下一节会介绍如何创建一个工作队列。