RabbitMQ基本原理

RabbitMQ是什么?

一款采用AMQP(高级消息队列协议)的消息队列技术,用于在分布式系统中存储和转发消息。通过消息队列可以实现服务之间的高度解耦异步通信、流量削峰

消息基于什么传输?

我们应该都是知道,网络数据传输协议TCP的性能并不高,因为它要求通信双方首先建立连接,才能正常进行数据通信。

如果RabbitMQ使用TCP传输数据的话,频繁的TCP连接创建和销毁开销会很大。所以RabbitMQ使用信道(它是建立在真实的TCP连接内的虚拟连接)的方式来传输数据。

如何确保接收到消息生产者发过来的消息?

RabbitMQ使用发送方确认模式,来确保消息发送到RabbitMQ上。

什么是发送方确认模式?

该模式的通信方式是异步的。消息生产者应用程序A在等待信道确认消息的同时,还可以继续发送消息。

这种模式的工作方式是什么?首先需要将信道设置为confirm模式,也就是发送方确认模式。一旦消息生产者应用程序A将消息投递到消息队列后,信道会发送一个确认送达的指示给消息生产者应用程序A。假如在RabbitMQ内部发生错误从而导致消息丢失,信道会给消息生产者应用程序A发送一条未确认的消息。

如何确保消息被消费者应用程序消费的?

RabbitMQ使用接收方消息确认机制,来确保消息被消费者应用程序消费。

接收方消息确认机制的工作方式是什么?

消费者应用程序接收到每一条消息后,会进行确认操作,并告知RabbitMQ,此时RabbitMQ就会把消息从队列中删除,释放空间。

特殊情况1:如果消费者应用程序只是成功接收消息,但是没有进行确认消息的操作,同时与RabbitMQ的连接也未断开,那么RabbitMQ就会认为该消费者应用程序很忙,将降低给它分发消息的频率。

特殊情况2:如果消费者应用程序只是成功接收消息,但是在进行确认消息的操作之前,与RabbitMQ的连接断开了,那么RabbitMQ就会认为该消息没有成功被消费,会重新分发给消费者应用程序。

如何避免消息被重复投递或重复消费?

RabbitMQ内部会对每一条由消息生产者应用程序生产的消息生产一个inner-msg-id,作为去重的依据,避免重复的消息进入消息队列。

消费者应用程序在消费消息时,必须要求消息体中有一个bizId作为去重的依据,避免同一条消息被重复消费。

RabbitMQ如何确保消息不丢失?

如果在突发情况下,服务器宕机了,RabbitMQ还能恢复宕机之前正在进行的消息吗?

解决这种问题的办法,就是将消息写入磁盘上的一个持久化日志文件中。

具体是这样的:(1)只有消息生产者生产的消息先保存在这个持久化的日志文件中,才给予响应。(2)只有消息消费者成功把消息消费了,RabbitMQ才会把持久化日志文件中的消息标记为等待垃圾回收。

系统架构

Rabbitmq系统最核心的组件是ExchangeQueue,下图是系统简单的示意图。Exchange和Queue是在rabbitmq server(又叫做broker)端,producer和consumer在应用端。RabbitMQ基本原理RabbitMQ基本原理

Producer&Consumer

producer指的是消息生产者,consumer消息的消费者。

Queue

消息队列,提供了FIFO的处理机制,具有缓存消息的能力。rabbitmq中,队列消息可以设置为持久化,临时或者自动删除。

  1. 设置为持久化的队列,queue中的消息会在server本地硬盘存储一份,防止系统crash,数据丢失
  2. 设置为临时队列,queue中的数据在系统重启之后就会丢失
  3. 设置为自动删除的队列,当不存在用户连接到server,队列中的数据会被自动删除

Exchange

Exchange类似于数据通信网络中的交换机,提供消息路由策略。rabbitmq中,producer不是通过信道直接将消息发送给queue,而是先发送给Exchange。一个Exchange可以和多个Queue进行绑定,producer在传递消息的时候,会传递一个ROUTING_KEY,Exchange会根据这个ROUTING_KEY按照特定的路由算法,将消息路由给指定的queue。和Queue一样,Exchange也可设置为持久化,临时或者自动删除。

Exchange有4种类型:direct(默认),fanout, topic, 和headers,不同类型的Exchange转发消息的策略有所区别:

Direct

直接交换器,工作方式类似于单播,Exchange会将消息发送完全匹配ROUTING_KEY的Queue

fanout

广播是式交换器,不管消息的ROUTING_KEY设置为什么,Exchange都会将消息转发给所有绑定的Queue。

topic

主题交换器,工作方式类似于组播,Exchange会将消息转发和ROUTING_KEY匹配模式相同的所有队列,比如,ROUTING_KEY为user.stock的Message会转发给绑定匹配模式为 * .stock,user.stock, * . * 和#.user.stock.#的队列。( * 表是匹配一个任意词组,#表示匹配0个或多个词组)

headers

消息体的header匹配(ignore)

Binding

所谓绑定就是将一个特定的 Exchange 和一个特定的 Queue 绑定起来。Exchange 和Queue的绑定可以是多对多的关系。

virtual host

在rabbitmq server上可以创建多个虚拟的message broker,又叫做virtual hosts (vhosts)。每一个vhost本质上是一个mini-rabbitmq server,分别管理各自的exchange,和bindings。vhost相当于物理的server,可以为不同app提供边界隔离,使得应用安全的运行在不同的vhost实例上,相互之间不会干扰。producer和consumer连接rabbit server需要指定一个vhost。

通信过程

假设P1和C1注册了相同的Broker,Exchange和Queue。P1发送的消息最终会被C1消费。基本的通信流程大概如下所示:

  1. P1生产消息,发送给服务器端的Exchange
  2. Exchange收到消息,根据ROUTINKEY,将消息转发给匹配的Queue1
  3. Queue1收到消息,将消息发送给订阅者C1
  4. C1收到消息,发送ACK给队列确认收到消息
  5. Queue1收到ACK,删除队列中缓存的此条消息

Consumer收到消息时需要显式的向rabbit broker发送basic.ack消息或者consumer订阅消息时设置auto_ack参数为true。在通信过程中,队列对ACK的处理有以下几种情况:

  1. 如果consumer接收了消息,发送ack,rabbitmq会删除队列中这个消息,发送另一条消息给consumer。
  2. 如果cosumer接受了消息, 但在发送ack之前断开连接,rabbitmq会认为这条消息没有被deliver,在consumer在次连接的时候,这条消息会被redeliver。
  3. 如果consumer接受了消息,但是程序中有bug,忘记了ack,rabbitmq不会重复发送消息。
  4. rabbitmq2.0.0和之后的版本支持consumer reject某条(类)消息,可以通过设置requeue参数中的reject为true达到目地,那么rabbitmq将会把消息发送给下一个注册的consumer。