RabbitMQ架构
mssj0912
7年前
<p>RabbitMQ是一个高可用的消息中间件,支持多种协议和集群扩展。并且支持消息持久化和镜像队列,适用于对消息可靠性较高的场合,基本模型如下。</p> <p><img src="https://simg.open-open.com/show/3931949bb4852036ab995b325a182592.png"></p> <p>其客户端使用方式</p> <pre> <code class="language-java">from kombu import Connection, Exchange, Queue media_exchange = Exchange('media', 'direct', durable=True) video_queue = Queue('video', exchange=media_exchange, routing_key='video') def process_media(body, message): print body message.ack() # connections with Connection('amqp://guest:guest@localhost//') as conn: # produce producer = conn.Producer(serializer='json') producer.publish({'name': '/tmp/lolcat1.avi', 'size': 1301013}, exchange=media_exchange, routing_key='video', declare=[video_queue]) # the declare above, makes sure the video queue is declared # so that the messages can be delivered. # It's a best practice in Kombu to have both publishers and # consumers declare the queue. You can also declare the # queue manually using: # video_queue(conn).declare() # consume with conn.Consumer(video_queue, callbacks=[process_media]) as consumer: # Process messages and handle events on all channels while True: conn.drain_events() </code></pre> <p>示例中的发布端和消费端是同一方,而实际中的使用方式一般有多种场景,topic模式、fanout模式、direct模式和RPC模式。</p> <p><img src="https://simg.open-open.com/show/c2cab17b6ee560943403c1d998d7a9d7.png"> <img src="https://simg.open-open.com/show/c813069a23d72a7a3f519d896270a62a.png"></p> <ol> <li>topic模式,按照设置的路由信息(routing key)将消息路由到一个或者多个消费端,而消息只能由一个消费者消费一次。一个消费者可以设置多个路由信息,可以同时获取多个消费者发送的消息;</li> <li>fanout模式,与topic模式唯一的区别是同一消息会发送到订阅(binding)的多个消费者;</li> <li>direct模式,一对一模式,实际中比较少用;</li> <li>RPC模式,结合topic和direct模式,发送消息的同时指定要接受的消息。</li> </ol> <h2>RabbitMQ监控树</h2> <p>为了高可靠,Erlang中实际的工作进程(Erlang进程,并不是系统进程)都有一个监控进程,监控进程负责(一个或多个)工作进程的创建、销毁和重启。监控进程和工作进程的关系如图。</p> <p><img src="https://simg.open-open.com/show/a856d4d7116d9fff97ed4f5644c18bd5.gif"></p> <ol> <li>方块图是监控进程;</li> <li>圆圈是工作进程;</li> <li>方块中的”1“(one_for_one)和”a“(one_for_all)代表不同的监控策略</li> </ol> <p>one_for_one 监控策略,一个工作进程崩溃,则只重启崩溃的工作进程。</p> <p><img src="https://simg.open-open.com/show/63550e4d3caa83db90fd9d99308b0bd0.gif"></p> <p>one_for_all监控策略,一个工作进程崩溃,则销毁并重启所有工作进程</p> <p><img src="https://simg.open-open.com/show/4f9166bd46cbf384d3619869c45f6f5b.gif"> 在RabbitMQ中还有一种 <strong>simple_one_for_one监控策略</strong> ,与 <strong>one_for_one监控策略</strong> 相同,只不过重启工作进程时的启动参数是固定的。RabbitMQ网络框架也遵循该原则。</p> <p><img src="https://simg.open-open.com/show/9205f06d8ed482ea685b6b75f8198252.png"></p> <h2>RabbitMQ消息架构</h2> <p>当client端链接服务器时,RabbitMQ会启动一系列监控和工作进程来处理网络连接。</p> <p><img src="https://simg.open-open.com/show/a2763af3034ab97f9ce717eddecf16c9.png"> 为了降低TCP链接数量,多个消费者共享同一个链接Connection,但是每个消费者独享一个管道channel,用consumer_tag标识。consumer_tag在Connection唯一,从1开始累加,当重连接时需要匹配该tag。每个消费者对应独立的一套rabbit_channel_sup_sup->rabbit_channel_sup->rabbit_channel|rabbit_writer|rabbit_limiter系列进程。</p> <h2>RabbitMQ网络框架时序图</h2> <p><img src="https://simg.open-open.com/show/735933d45bdd9eebdf57e85d8ecc655d.png"> client建立链接后,RabbitMQ通过tcp_acceptor进程处理accept成功后返回的clientfd。</p> <p><img src="https://simg.open-open.com/show/c819cf50f1447e883cdc8944475d5e7a.png"> rabbit_reader从TCP链接中读取数据,然后根据协议回调函数处理客户端的各种请求。</p> <h2>RabbitMQ消息处理流程</h2> <p><img src="https://simg.open-open.com/show/f8eddd13ef623384c0f2754761ed1dbf.png"> RabbitMQ先验证权限;然后检查Exchange是否存在,不存在则创建;检查消息是否合法以及是否需要confirm等;根据路由信息选择消费队列;检查消费队列是否存在,有则将消息发送给消息队列;检查消费者是否存在,存在则将消息发送给消费者client端。</p> <p><img src="https://simg.open-open.com/show/b57c4db61d20b17821a88d408e70d86d.png"> RabbitMQ会根据不同的消息的不同类型做不同的处理:</p> <ol> <li>不持久化消息,如果没有消费者则直接丢掉,不会入消费队列;如果有,则先入消息队列,按照入队顺序依次发送给消费者。</li> <li>持久化消息,将消息持久化成功后才给发送端发ack,然后再发送给消费者。</li> </ol> <p>(完)</p> <p> </p> <p>来自:https://fanchao01.github.io/blog/2018/02/09/rabbitmq-arch/</p> <p> </p>