如何优雅的使用RabbitMQ

alwinc 8年前
   <p>RabbitMQ无疑是目前最流行的消息队列之一,对各种语言环境的支持也很丰富,作为一个.NET developer有必要学习和了解这一工具。消息队列的使用场景大概有3种:</p>    <p>1、系统集成,分布式系统的设计。各种子系统通过消息来对接,这种解决方案也逐步发展成一种架构风格,即“通过消息传递的架构”。</p>    <p>2、当系统中的同步处理方式严重影响了吞吐量,比如日志记录。假如需要记录系统中所有的用户行为日志,如果通过同步的方式记录日志势必会影响系统的响应速度,当我们将日志消息发送到消息队列,记录日志的子系统就会通过异步的方式去消费日志消息。</p>    <p>3、系统的高可用性,比如电商的秒杀场景。当某一时刻应用服务器或数据库服务器收到大量请求,将会出现系统宕机。如果能够将请求转发到消息队列,再有服务器去消费这些消息将会使得请求变得平稳,提高系统的可用性。</p>    <h2>一、开始使用RabbitMQ</h2>    <p>RabbitMQ官网提供了详细的 <a href="/misc/goto?guid=4958188626551291166" rel="nofollow,noindex">安装步骤</a> ,另外官网还提供了RabbitMQ在六种场景的 <a href="/misc/goto?guid=4958859888550363129" rel="nofollow,noindex">使用教程</a> 。其中教程1、3、6将覆盖99%的使用场景,所以正常来说只需要搞清楚这3个教程即可快速上手。</p>    <h2>二、简单分析</h2>    <p>我们以官方提供的教程1做个简单梳理:该教程展示了Producer如何向一个消息队列(message queue)发送一个消息(message),消息消费者(Consumer)收到该消息后消费该消息。</p>    <p>1、producer端:</p>    <pre>  <code class="language-cs">var factory = new ConnectionFactory() { HostName = "localhost" };              using (var connection = factory.CreateConnection())              {                  while (Console.ReadLine() != null)                  {                      using (var channel = connection.CreateModel())                      {                          //创建一个名叫"hello"的消息队列                          channel.QueueDeclare(queue: "hello",                              durable: false,                              exclusive: false,                              autoDelete: false,                              arguments: null);                            var message = "Hello World!";                          var body = Encoding.UTF8.GetBytes(message);                            //向该消息队列发送消息message                          channel.BasicPublish(exchange: "",                              routingKey: "hello",                              basicProperties: null,                              body: body);                          Console.WriteLine(" [x] Sent {0}", message);                      }                  }              }</code></pre>    <p>该段代码非常简单,几乎到了无法精简的地步:创建了一个信道(channel)->创建一个队列->向该队列发送消息。</p>    <p>2、Consumer端</p>    <pre>  <code class="language-cs">var factory = new ConnectionFactory() { HostName = "localhost" };              using (var connection = factory.CreateConnection())              {                  using (var channel = connection.CreateModel())                  {                      //创建一个名为"hello"的队列,防止producer端没有创建该队列                      channel.QueueDeclare(queue: "hello",                                           durable: false,                                           exclusive: false,                                           autoDelete: false,                                           arguments: null);                        //回调,当consumer收到消息后会执行该函数                      var consumer = new EventingBasicConsumer(channel);                      consumer.Received += (model, ea) =>                      {                          var body = ea.Body;                          var message = Encoding.UTF8.GetString(body);                          Console.WriteLine(" [x] Received {0}", message);                      };                        //消费队列"hell"中的消息                      channel.BasicConsume(queue: "hello",                                           noAck: true,                                           consumer: consumer);                        Console.WriteLine(" Press [enter] to exit.");                      Console.ReadLine();                  }              }</code></pre>    <p>该段代码可以理解为:创建信道->创建队列->定义回调函数->消费消息。</p>    <p>该实例可以简单理解为1(producer) VS 1(consumer)的场景;</p>    <p><img src="https://simg.open-open.com/show/ec1c1ed3525c8971c26f12620578e287.png"></p>    <p>实例3则描述了publish/subscriber模式,即1(producer) VS 多个(consumer);</p>    <p><img src="https://simg.open-open.com/show/ec6eb44995034d95859ef9513c1bf674.png"></p>    <p>在以上两个示例中,producer只需要发送消息即可,并不关心consumer的返回结果。实例6则描述了一个RPC调用场景,producer发送消息后还要接收consumer的返回结果,这一场景看起来跟使用消息队列的目的有点相悖。因为使用消息队列的目的之一就是要异步,但是这一场景似乎又将异步变成了同步,不过这一场景也很有用,比如一个用户操作产生了一个消息,应用服务收到该消息后执行了一些逻辑并使得数据库发生了变化,UI会一直等待应用服务的返回结果才刷新页面。</p>    <p><img src="https://simg.open-open.com/show/77d8024a36261a440ed9796d64169b54.png"></p>    <h2>三、 发现抽象</h2>    <p>我桌子上放着一本RabbitMQ in Action,另外官网提供的文档也很详细,我感觉在一个月内我就能精通RabbitMQ,到时候简历上又可以写上“精通…”,感觉有点小得意呢... ,但是我知道这并不是使用RabbitMQ的最佳方式。</p>    <p>我们知道抽象可以帮我们隐藏掉一些技术细节,让我们将重心放在核心业务上,比如一个人问你:“大雁塔如何走?”你的回答可能是“小寨往东,一直走两站,右手边”,如果你回答:“右转45度,向前走100米,再转90度…”,对方就会迷失在这些细节中。</p>    <p>消息队列的使用过程中实际隐藏着一种抽象——服务总线(Service Bus)。</p>    <p>我们在回头看第一个例子,这个例子隐含的业务是:ClientA发送一个指令,ClientB收到该指令后做出反应。如果是这样,我们为什么要关心如何创建channel,如何创建一个queue? 我仅仅是要发送一个消息而已。另外这个例子写的其实不够健壮:</p>    <p>没有重试机制:如果ClientB第一次没有执行成功如何对该消息处理?</p>    <p>没有错误处理机制:如果ClientB在重试了N次之后还是异常如何处理该消息?</p>    <p>没有熔断机制;</p>    <p>如何对ClientA做一个schedule(计划安排),比如定时发送等;</p>    <p>没有消息审计机制;</p>    <p>无法对消息的各个状态做追踪;</p>    <p>事物处理等。</p>    <p>服务总线正是这种场景的抽象,并且为我们提供了这些机制,让我们赶快来看个究竟吧。</p>    <h2>四、初视MassTransit</h2>    <p>MassTransit是.NET平台下的一款开源免费的ESB产品,官网: <a href="/misc/goto?guid=4958537093036633352" rel="nofollow,noindex">http://masstransit-project.com/</a> ,GitHub 700 star,500 Fork,类似的产品还有 <a href="/misc/goto?guid=4958975489255492526" rel="nofollow,noindex">NServiceBus</a> ,之所以要选用MassTransit是因为他要比NServiceBus轻量级,另外在MassTransit开发之初就选用了RabbitMQ作为消息传输组建;同时我想拿他跟NServiceBus做个比较,看看他们到底有哪些侧重点。</p>    <p>1、新建控制台应用程序:Masstransit.RabbitMQ.GreetingClient</p>    <p>使用MassTransit可以从Nuget中安装:</p>    <pre>  <code class="language-cs">Install-Package MassTransit.RabbitMQ</code></pre>    <p>2、创建服务总线,发送一个命令</p>    <pre>  <code class="language-cs">static void Main(string[] args)          {              Console.WriteLine("Press 'Enter' to send a message.To exit, Ctrl + C");                var bus = BusCreator.CreateBus();              var sendToUri = new Uri($"{RabbitMqConstants.RabbitMqUri}{RabbitMqConstants.GreetingQueue}");                while (Console.ReadLine()!=null)              {                  Task.Run(() => SendCommand(bus, sendToUri)).Wait();              }                Console.ReadLine();          }            private static async void SendCommand(IBusControl bus,Uri sendToUri)          {              var endPoint =await bus.GetSendEndpoint(sendToUri);              var command = new GreetingCommand()              {                  Id = Guid.NewGuid(),                  DateTime = DateTime.Now              };                await endPoint.Send(command);                Console.WriteLine($"send command:id={command.Id},{command.DateTime}");           }</code></pre>    <p>这一段代码隐藏了众多关于消息队列的细节,将我们的注意力集中在发送消息上,同时ServiceBus提供的API也更接近业务,我们虽然发送的是一个消息,但是在这种场景下体现出来是一个命令,Send(command)这一API描述了我们的意图。</p>    <p>3、服务端接收这一命令</p>    <p>新建一个命令台控制程序:Masstransit.RabbitMQ.GreetingServer</p>    <pre>  <code class="language-cs">var bus = BusCreator.CreateBus((cfg, host) =>              {                  cfg.ReceiveEndpoint(host, RabbitMqConstants.GreetingQueue, e =>                  {                      e.Consumer<GreetingConsumer>();                    });              });</code></pre>    <p>这一代码可以理解为服务端在监听消息,我们在服务端注册了一个名为“GreetingConsumer”的消费者,GreetingConsumer的定义:</p>    <pre>  <code class="language-cs">public class GreetingConsumer :IConsumer<GreetingCommand>      {          public async Task Consume(ConsumeContext<GreetingCommand> context)          {                await Console.Out.WriteLineAsync($"receive greeting commmand: {context.Message.Id},{context.Message.DateTime}");          }      }</code></pre>    <p>该consumer可以消费类型为GreetingCommand的消息。这一实例几乎隐藏了有关RabbitMQ的技术细节,将代码中心放在了业务中,将这两个控制台应用跑起来试试:</p>    <p><img src="https://simg.open-open.com/show/813281f0a36950109532acf614aa1379.png"></p>    <h2>五、实现Publish/Subscribe模式</h2>    <p>发布/订阅模式使得基于消息传递的软件架构成为可能,这一能力表现为ClientA发送消息X,ClientB和ClientC都可以订阅消息X。</p>    <p>1、我们在上面的例子中改造一下,当GreetingConsumer收到GreetingCommand后发送一个GreetingEvent:</p>    <pre>  <code class="language-cs">var greetingEvent = new GreetingEvent()              {                  Id = context.Message.Id,                  DateTime = DateTime.Now              };                await context.Publish(greetingEvent);</code></pre>    <p>2、新建控制台程序Masstransit.RabbitMQ.GreetingEvent.SubscriberA用来订阅GreetingEvent消息:</p>    <pre>  <code class="language-cs">var bus = BusCreator.CreateBus((cfg, host) =>              {                  cfg.ReceiveEndpoint(host, RabbitMqConstants.GreetingEventSubscriberAQueue, e =>                  {                      e.Consumer<GreetingEventConsumer>();                  });              });                bus.Start();</code></pre>    <p>定义GreetingEventConsumer:</p>    <pre>  <code class="language-cs">public class GreetingEventConsumer:IConsumer<Greeting.Message.GreetingEvent>      {          public async Task Consume(ConsumeContext<Greeting.Message.GreetingEvent> context)          {              await Console.Out.WriteLineAsync($"receive greeting event: id {context.Message.Id}");          }      }</code></pre>    <p>这一代码跟Masstransit.RabbitMQ.GreetingServer接受一个命令几乎一模一样,唯一的区别在于:</p>    <p>在Send/Receive模式中Client首先要获得对方的终结点(endpoint),直接向该终结点发送命令。Server方监听自己的终结点并消费命令。</p>    <p>而Publish/Subscribe模式中Client publish一个事件,SubscriberA在自己的终结点(endpointA)监听事件,SubscriberB在自己的终结点(endpointB)监听事件。</p>    <p>3、根据上面的分析再定义一个Masstransit.RabbitMQ.GreetingEvent.SubscriberB</p>    <p>4、将4个控制台应用程序跑起来看看</p>    <p><img src="https://simg.open-open.com/show/68fd3ce6d9695a5fe394c87baf37d013.png"></p>    <h2>六、实现RPC模式</h2>    <p>这一模式在Masstransit中被称作Request/Response模式,通过IRequestClient<ISimpleRequest, ISimpleResponse> 接口来实现相关操作。一个相关的例子在官方的 <a href="/misc/goto?guid=4959673429225046637" rel="nofollow,noindex">github</a> 。</p>    <p>结束语:本篇文章分析了如何使用Masstransit来抽象业务,避免直接使用具体的消息队列,当然本文提到的众多服务总线机制,如“重试、熔断等”并没有在该文中出现,需要大家进一步去了解该项目。</p>    <p>通过对Masstransit的一些试用和NServiceBus的对比,Masstransit在实际项目中很容易上手并且免费,各种API定义的也非常清晰,但是官方的文档有点过于简单,实际使用中还需要去做深入的研究。作为.NET平台下为数不多的ESB开源产品,其关注程度还是不够,期待大家为开源项目做出贡献。</p>    <p>本文例子提供下载: <a href="/misc/goto?guid=4959673429323393159" rel="nofollow,noindex">http://git.oschina.net/richieyangs/RabbitMQ.Practice</a></p>    <p> </p>    <p>来自: <a href="/misc/goto?guid=4959673429413032260" rel="nofollow">http://www.cnblogs.com/richieyang/p/5492432.html</a></p>    <p> </p>