# 一.基础概念

# 1.什么是 RabbitMQ?

RabbitMQ 是一个在 AMQP(Advanced Message Queuing Protocol )基础上实现的,可复用的企业消息系统。它可以用于大型软件系统各个模块之间的高效通信,支持高并发,支持可扩展。它支持多种客户端如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP 等,支持 AJAX,持久化,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

RabbitMQ 是使用 Erlang 编写的一个开源的消息队列,本身支持很多的协议:AMQP,XMPP, SMTP, STOMP,也正是如此,使的它变的非常重量级,更适合于企业级的开发。它同时实现了一个 Broker 构架,这意味着消息在发送给客户端时先在中心队列排队,对路由(Routing)、负载均衡(Load balance)或者数据持久化都有很好的支持。

# 2.RabbitMQ 的特点?

  • 可靠性: RabbitMQ 使用一些机制来保证可靠性, 如持久化、传输确认及发布确认等。
  • 灵活的路由 : 在消息进入队列之前,通过交换器来路由消息。对于典型的路由功能, RabbitMQ 己经提供了一些内置的交换器来实现。针对更复杂的路由功能,可以将多个交换器绑定在一起, 也可以通过插件机制来实现自己的交换器。
  • 扩展性: 多个 RabbitMQ 节点可以组成一个集群,也可以根据实际业务情况动态地扩展集群中节点。
  • 高可用性 : 队列可以在集群中的机器上设置镜像,使得在部分节点出现问题的情况下队 列仍然可用。
  • 多种协议: RabbitMQ 除了原生支持 AMQP 协议,还支持 STOMPMQTT 等多种消息 中间件协议。
  • 多语言客户端 :RabbitMQ 几乎支持所有常用语言,比如 JavaPython、 Ruby、 PHP、 C#、 JavaScript 等。
  • 管理界面 : RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息、集群中的节点等。
  • 插件机制 : RabbitMQ 提供了许多插件 ,以实现从多方面进行扩展,当然也可以编写自 己的插件。

# 3.AMQP 是什么?

RabbitMQ 就是 AMQP 协议的 Erlang 的实现(当然 RabbitMQ 还支持 STOMP2MQTT3 等协议 ) AMQP 的模型架构 和 RabbitMQ 的模型架构是一样的,生产者将消息发送给交换器,交换器和队列绑定 。

RabbitMQ 中的交换器、交换器类型、队列、绑定、路由键等都是遵循的 AMQP 协议中相应的概念。目前 RabbitMQ 最新版本默认支持的是 AMQP 0-9-1。

AMQP 协议的三层

说明
模块层(Module Layer) 协议最高层,定义客户端调用的命令,用于业务逻辑。
会话层(Session Layer) 中间层,负责将客户端命令发送给服务器,处理可靠性、同步和错误。
传输层(Transport Layer) 最底层,传输二进制数据流,提供帧处理、信道复用、错误检测和数据表示等。

AMQP 模型的三大组件

AMQP 模型组件 描述
交换器(Exchange) 消息代理服务器中用于把消息路由到队列的组件。
队列(Queue) 用来存储消息的数据结构,位于硬盘或内存中。
绑定(Binding) 一套规则,告知交换器应该将消息投递给哪个队列。

# 4.RabbitMQ 的组件

RabbitMQ 是一种开源的消息代理(message broker)软件,用于在应用程序之间传递消息、任务和数据。它实现了高级消息队列协议(AMQP),并提供了可靠的消息传递机制。RabbitMQ 由多个组件组成,这些组件一起协同工作,以确保消息在应用程序之间可靠地传递和处理。以下是 RabbitMQ 的一些核心组件:

  1. Producer(生产者):Producer 负责创建并发布消息到 RabbitMQ 的交换机。生产者将消息发送到交换机,然后交换机负责将消息路由到一个或多个队列。

  2. Exchange(交换机):交换机是消息的接收路由中心。生产者将消息发送到交换机,然后交换机将消息根据规则路由到一个或多个队列。RabbitMQ 提供了不同类型的交换机,例如直接交换机、主题交换机、扇形交换机和头部交换机,每种类型都有不同的路由策略。

  3. Queue(队列)队列是消息的存储地点。它们是消息的终点,消费者从队列中获取消息并进行处理。每个队列都有一个名称,消费者可以根据队列名称订阅接收特定类型的消息。

  4. Consumer(消费者):消费者是从队列中接收消息并进行处理的应用程序组件。消费者订阅一个特定的队列,然后等待队列中的消息。一旦有新消息进入队列,消费者就会从队列中获取并处理消息。

  5. Binding(绑定):绑定是连接交换机队列的规则。它告诉交换机如何将消息路由到特定队列。绑定通常基于路由键(routing key)进行配置,路由键是生产者在发布消息时附加的标签,交换机使用它来确定将消息路由到哪个队列。

  6. Virtual Host(虚拟主机):虚拟主机是 RabbitMQ 服务器上的逻辑隔离单元。每个虚拟主机拥有自己的交换机、队列、绑定等资源,它们彼此隔离,可以帮助不同应用程序或团队之间进行资源隔离和权限控制。

  7. Broker(代理)RabbitMQ 代理是消息传递路由的核心组件。它负责处理消息的路由、存储和传递,确保消息按照预期进行处理。

除了上述核心组件外,RabbitMQ 还提供了管理界面(HTTP API 和 Web 控制台)以及插件系统,使用户能够扩展和定制 RabbitMQ 的功能。这些组件一起构成了 RabbitMQ 体系结构,使其能够实现高效的消息传递和处理。

# 5.Broker 和 Exchange 的区别?

BrokerExchange是在消息传递系统(如RabbitMQ)中的两个关键组件,它们有不同的功能和作用。

Broker(代理)

  • Broker 是整个消息传递系统的核心组件,它负责接收存储路由消息。
  • 它可以理解为消息传递系统的中心枢纽,负责协调消息的发送和接收。
  • Broker管理着消息的生命周期,包括消息的传递、持久化、分发等。
  • RabbitMQ中,Broker就是RabbitMQ服务器本身,它负责处理客户端的连接交换机队列等资源。

Exchange(交换机)

  • 交换机是消息路由的组件,用于确定消息应该被路由到哪个队列。
  • 它接收来自生产者的消息,并根据预定的路由规则将消息路由到一个或多个队列。
  • Exchange 的路由规则由绑定(Binding)来定义,绑定将交换机与队列关联起来,并指定了特定的路由键(Routing Key)。
  • 在 RabbitMQ 中,交换机有不同的类型(直接、主题、扇形等),每种类型都采用不同的路由策略来决定消息的传递。

总结区别:

  • Broker 是整个消息传递系统的核心,负责消息的存储、传递和管理。
  • Exchange 是消息路由的组件,负责将消息路由到特定的队列。
  • Broker包含了更多的功能,而Exchange则是Broker中的一个子组件,用于特定的消息路由任务。

# 二.基础组件

# 1.RabbitMQ 架构

  • Publisher - 生产者:发布消息到 RabbitMQ 中的 Exchange

  • Consumer - 消费者:监听 RabbitMQ 中的 Queue 中的消息

  • Exchange - 交换机:和生产者建立连接并接收生产者的消息

  • Queue - 队列:Exchange 会将消息分发到指定的 Queue,Queue 和消费者进行交互

  • Routes - 路由:交换机以什么样的策略将消息发布到 Queue

image-20240126160154803

# 2.RabbitMQ 核心概念

  • Message :指发送和接受的消息,包括消息头和消息体。
  • Publisher:消息的生产者,向交换器发布消息的应用程序,指我们程序中的生产者
  • Exchange:交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
  • Queue:消息队列,用来存放消息。
  • Binding:绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。Exchange 和 Queue 的绑定可以是多对多的关系。
  • Connection:网络连接,比如一个 TCP 连接。
  • Channel:信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的 TCP 连接内的虚拟连接, AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。
  • Consumer:消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。
  • Virtual Host:虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。 vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。

RabbitMQ 整体上是一个生产者与消费者模型,主要负责接收、存储和转发消息。可以把消息传递的过程想象成:当你将一个包裹送到邮局邮局会暂存并最终将邮件通过邮递员送到收件人的手上,RabbitMQ 就好比由邮局、邮箱和邮递员组成的一个系统。从计算机术语层面来说,RabbitMQ 模型更像是一种交换机模型。

image-20231215010606749

# 3.Queue 消息队列

Queue(消息队列) 用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。

RabbitMQ 中消息只能存储在队列中,这一点和 Kafka 这种消息中间件相反。Kafka 将消息存储在 topic(主题) 这个逻辑层面,而相对应的队列逻辑只是 topic 实际存储文件中的位移标识。 RabbitMQ 的生产者生产消息并最终投递到队列中,消费者可以从队列中获取消息并消费。

多个消费者可以订阅同一个队列,这时队列中的消息会被平均分摊(Round-Robin,即轮询)给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理,这样避免消息被重复消费。

RabbitMQ 不支持队列层面的广播消费,如果有广播消费的需求,需要在其上进行二次开发,这样会很麻烦,不建议这样做。

# 4.Broker

Broker 消息中间件的服务节点

对于 RabbitMQ 来说,一个 RabbitMQ Broker 可以简单地看作一个 RabbitMQ 服务节点,或者 RabbitMQ 服务实例。大多数情况下也可以将一个 RabbitMQ Broker 看作一台 RabbitMQ 服务器。

下图展示了生产者将消息存入 RabbitMQ Broker,以及消费者从 Broker 中消费数据的整个流程。

image-20231215010540719

# 5.Exchange 交换机

Exchange(交换器),在 RabbitMQ 中,消息并不是直接被投递到 Queue(消息队列) 中的,中间还必须经过 Exchange(交换器) 这一层,Exchange(交换器) 会把我们的消息分配到对应的 Queue(消息队列) 中。

Exchange(交换器) 用来接收生产者发送的消息并将这些消息路由给服务器中的队列中,如果路由不到,或许会返回给 Producer(生产者) ,或许会被直接丢弃掉 。这里可以将 RabbitMQ 中的交换器看作一个简单的实体。

RabbitMQ 的 Exchange(交换器) 有 4 种类型,不同的类型对应着不同的路由策略

  • direct(默认)
  • fanout
  • topic
  • headers

不同类型的 Exchange 转发消息的策略有所区别。

生产者将消息发给交换器的时候,一般会指定一个 RoutingKey(路由键),用来指定这个消息的路由规则,而这个 RoutingKey 需要与交换器类型和绑定键(BindingKey)联合使用才能最终生效

RabbitMQ 中通过 Binding(绑定)Exchange(交换器)Queue(消息队列) 关联起来,在绑定的时候一般会指定一个 BindingKey(绑定建) ,这样 RabbitMQ 就知道如何正确将消息路由到队列了,如下图所示。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。Exchange 和 Queue 的绑定可以是多对多的关系。

Binding(绑定) 示意图:

image-20230812120251545

生产者将消息发送给交换器时,需要一个 RoutingKey,当 BindingKeyRoutingKey 相匹配时,消息会被路由到对应的队列中。在绑定多个队列到同一个交换器的时候,这些绑定允许使用相同的 BindingKeyBindingKey 并不是在所有的情况下都生效,它依赖于交换器类型,比如 fanout 类型的交换器就会无视,而是将消息路由到所有绑定到该交换器的队列中.

# 6.direct 交换机

RabbitMQ消息代理中,Direct交换机是一种用于消息路由的一种类型的交换机。它是一种简单的交换机,它将消息从生产者发送到队列,通过使用与消息关联的路由键(routing key)来决定消息应该发送到哪个队列。Direct 交换机在路由键与绑定队列时指定的路由键完全匹配时,才将消息传递到相应的队列。

以下是 Direct 交换机的一些特性和使用方式:

  1. 路由键匹配: Direct 交换机使用消息的路由键与队列绑定时指定的路由键进行精确匹配。只有当消息的路由键与绑定队列的路由键完全相同时,消息才会被传递到该队列。

  2. 单播消息: Direct 交换机用于一对一的消息传递,其中消息生产者将消息发送到指定的交换机,并在消息中指定一个特定的路由键。只有使用与消息中的路由键完全匹配的队列才会接收消息。

  3. 多队列: 可以将多个队列绑定到同一个 Direct 交换机,并使用不同的路由键。消息的发送者可以根据需要选择要发送到的队列,只需在消息中指定相应的路由键。

  4. 无需匹配模式: 相较于其他类型的交换机(如 Topic 交换机),Direct 交换机不支持通配符匹配模式。它只能根据精确的路由键匹配来进行消息传递。

使用 Direct 交换机时的一个示例场景是,在一个名为"logs"的 Direct 交换机上,有两个队列分别绑定了不同的路由键:"error"和"info"。当生产者发送一条消息时,可以指定它的路由键为"error"或"info",分别将消息发送到与之匹配的队列上。

Direct 交换机适用于需要精确控制消息路由的情况,其中每个消息都被显式地发送到一个特定的队列,而不需要使用复杂的通配符或模式匹配。

Exchange 默认是此类型。绑定键(binding key)和路由键(routing key)完全匹配的队列才推送消息。它会把消息路由到那些 Bindingkey 与 RoutingKey 完全匹配的 Queue 中。

public class Producer {
    //交换机名称
    public static final String DIRECT_EXCHANGE="direct_exchange";

    public static void main(String[] args) throws Exception {
        //获取信道
        Channel channel = RabbitUtils.getChannel();
        //声明交换机
        channel.exchangeDeclare(DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT);
        //发送消息
        Scanner scanner = new Scanner(System.in);
        //发送消息
        while (true){
            System.out.print("请输入发送的消息:");
            String message=scanner.nextLine();
            System.out.print("请输入发送消息的路由:");
            String routingKey=scanner.nextLine();

            //发送消息
            channel.basicPublish(DIRECT_EXCHANGE,routingKey,null,message.getBytes("UTF-8"));
            System.out.println("生产者发送消息:"+message+",发送路由为:"+routingKey);
        }

    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

image-20240118163712641

# 7.fanout 交换机

"fanout" 是一种交换机类型,用于实现发布-订阅(Publish-Subscribe)消息模型。Fanout 交换机的工作原理是将接收到的消息广播给所有绑定到它的队列,无论队列是否具有共同的主题或关键字。

Fanout 交换机不关心消息的路由键(routing key),它只是将接收到的消息发送给所有与之绑定的队列。这种模式适用于需要一对多广播消息的场景,例如日志记录、实时通知等。

以下是 Fanout 交换机的一些关键特点:

  1. 消息广播: Fanout 交换机将消息广播给所有绑定到它的队列,无论队列是否具有相同的绑定规则。

  2. 无路由键: Fanout 交换机不关心消息的路由键,只需将消息发送到绑定的所有队列。

  3. 多播特性: Fanout 交换机实现了一对多的消息传递,适用于需要多个消费者同时接收消息的场景。

使用 Fanout 交换机时,消息的发送者将消息发布到 Fanout 交换机,然后交换机会将消息广播给所有与之绑定的队列,每个队列的消费者都可以获取到相同的消息副本。这使得多个消费者能够同时处理相同的消息,实现了消息的广播传递。

要创建一个 Fanout 交换机及其绑定的队列,您需要以下步骤:

  1. 创建 Fanout 交换机。
  2. 创建队列。
  3. 将队列绑定到 Fanout 交换机上。

以下是使用 RabbitMQ Java 客户端库的伪代码示例:

// 创建连接和通道
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

// 声明 Fanout 交换机
String exchangeName = "fanout_exchange";
channel.exchangeDeclare(exchangeName, "fanout");

// 声明队列
String queueName = "my_queue";
channel.queueDeclare(queueName, false, false, false, null);

// 将队列绑定到 Fanout 交换机
channel.queueBind(queueName, exchangeName, "");

// 发布消息到 Fanout 交换机
String message = "Hello, Fanout!";
channel.basicPublish(exchangeName, "", null, message.getBytes());

// 关闭通道和连接
channel.close();
connection.close();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

请注意,Fanout 交换机的消息传递方式是广播,因此可能会产生较高的资源开销,特别是在大规模系统中。如果您需要更精细的消息路由和过滤能力,可以考虑使用其他类型的交换机,如 Direct(直连)、Topic(主题)或 Headers(头部)交换机。

image-20231107000414939

# 8.topic 交换机

Topic 交换机是一种灵活的交换机类型,它允许你按照消息的路由键(Routing Key)模式来进行消息的发布订阅。每个队列都可以使用通配符模式来定义自己的路由键,以便匹配特定的消息。Topic 交换机的工作方式类似于发布-订阅模型,但它使用了更灵活的匹配规则

Topic 交换机的路由键由一个或多个单词组成,使用点号(.)来分隔。单词可以是任意字符串,而不仅仅是字母和数字。另外,路由键中可以使用两个特殊通配符:

  1. *:匹配一个单词,可以是任何内容。
  2. #:匹配零个或多个单词,可以是任何内容。

通过使用这些通配符,你可以创建非常灵活的消息路由规则。例如,如果你的队列绑定了路由键为 "*.critical" 的模式,那么它将接收所有以 .critical 结尾的消息,无论前面的单词是什么。如果你的队列绑定了路由键为 "stock.*",那么它将接收所有以 stock. 开头的消息。

使用 Topic 交换机,你可以实现复杂的消息路由逻辑,让不同的队列订阅特定类型的消息,同时保持灵活性和可扩展性。

以下是一个使用 RabbitMQ Topic 交换机的简单示例代码(使用 Python 的 Pika 库):

import pika

# 连接 RabbitMQ 服务器
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 声明一个 topic 交换机
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

# 发布消息
routing_key = 'stock.usd'
message = 'This is a stock message in USD.'
channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=message)

print(f" [x] Sent '{message}' with routing key '{routing_key}'")

# 关闭连接
connection.close()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

注意:在实际使用中,你需要根据你的实际需求来设置交换机、队列和绑定。上述示例仅为简单演示。

前面讲到 direct 类型的交换器路由规则是完全匹配 BindingKey 和 RoutingKey ,但是这种严格的匹配方式在很多情况下不能满足实际业务的需求。topic 类型的交换器在匹配规则上进行了扩展,它与 direct 类型的交换器相似,也是将消息路由到 BindingKey 和 RoutingKey 相匹配的队列中,但这里的匹配规则有些不同,它约定:

匹配一个:*
匹配0个或多个:#
一个队列同时满足2个条件:只会发一次消息
1
2
3
  • RoutingKey 为一个点号“.”分隔的字符串(被点号“.”分隔开的每一段独立的字符串称为一个单词),如 “com.rabbitmq.client”、“java.util.concurrent”、“com.hidden.client”;
  • BindingKey 和 RoutingKey 一样也是点号“.”分隔的字符串;
  • BindingKey 中可以存在两种特殊字符串*和“#”,用于做模糊匹配,其中星号用于匹配一个单词,“#”用于匹配多个单词(可以是零个)。

image-20231212223943867

以上图为例:

  • 路由键为 “com.rabbitmq.client” 的消息会同时路由到 Queue1 和 Queue2;
  • 路由键为 “com.hidden.client” 的消息只会路由到 Queue2 中;
  • 路由键为 “com.hidden.demo” 的消息只会路由到 Queue2 中;
  • 路由键为 “java.rabbitmq.demo” 的消息只会路由到 Queue1 中;
  • 路由键为 “java.util.concurrent” 的消息将会被丢弃或者返回给生产者(需要设置 mandatory 参数),因为它没有匹配任何路由键。

当一个队列绑定键是#,那么这个队列将接收所有数据,就有点像 fanout 了 如果队列绑定键当中没有#和*出现,那么该队列绑定类型就是 direct 了

# 9.headers 交换机

headers 交换机是一种特殊类型的交换机,它与其他常见的交换机类型(如 directfanouttopic)不同,它使用消息的 header 属性来进行路由。

headers 交换机中,消息的路由不是基于路由键(routing key),而是基于消息的 header 属性。消息发布者在发送消息时可以附加一些键值对到消息的 header 中,而消费者在绑定队列到 headers 交换机时,可以指定一组匹配规则,这些规则也是基于键值对。

headers 交换机的路由规则支持多种逻辑运算符,包括 x-matchx-match 可以有两个值:"any" 和 "all"。如果设置为 "any",则表示消息的 header 只需匹配一个规则即可;如果设置为 "all",则表示消息的 header 必须匹配所有规则。

下面是一个使用 headers 交换机的简单示例:

  1. 创建 headers 交换机:
# 使用命令行创建一个名为 "headers_exchange" 的 headers 交换机
rabbitmqadmin declare exchange name=headers_exchange type=headers
1
2
  1. 发布消息:
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='headers_exchange', exchange_type='headers')

message_body = "Hello, RabbitMQ!"
message_properties = pika.BasicProperties(
    headers={
        "key1": "value1",
        "key2": "value2"
    }
)

channel.basic_publish(
    exchange='headers_exchange',
    routing_key='',  # Not used for headers exchange
    body=message_body,
    properties=message_properties
)

connection.close()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
  1. 消费消息:
import pika

def callback(ch, method, properties, body):
    print("Received:", body)

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='headers_exchange', exchange_type='headers')

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

bindings = {
    "x-match": "any",
    "key1": "value1"
}

channel.queue_bind(exchange='headers_exchange', queue=queue_name, arguments=bindings)
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)

print('Waiting for messages...')
channel.start_consuming()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

在上述示例中,消息被发送到了一个名为 "headers_exchange" 的 headers 交换机,然后通过规则匹配,被绑定到具有指定键值对的队列进行消费。

请注意,headers 交换机适用于需要更复杂的消息路由逻辑,特别是当你想基于消息的属性而不是固定的路由键来进行消息路由时。

# 10.Producer(生产者)

Producer(生产者) :生产消息的一方(邮件投递者)

消息一般由 2 部分组成:消息头(或者说是标签 Label)和 消息体。消息体也可以称为 payLoad ,消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括 routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。生产者把消息交由 RabbitMQ 后,RabbitMQ 会根据消息头把消息发送给感兴趣的 Consumer(消费者)。

# 11.Consumer(消费者)

Consumer(消费者) :消费消息的一方(邮件收件人)

消费者

  • 消费消息,也就是接收消息的一方。
  • 消费者连接到 RabbitMQ 服务器,并订阅到队列上。消费消息时只消费消息体,丢弃标签。

# 12.监控管理

image-20230812121004774

# 三.各种队列

# 1.什么是死信队列?

DLX,全称为 Dead-Letter-Exchange,死信交换器,死信邮箱。当消息在一个队列中变成死信 (dead message) 之后,它能被重新被发送到另一个交换器中,这个交换器就是 DLX,绑定 DLX 的队列就称之为死信队列。

导致的死信的几种原因

  • 消息被拒(Basic.Reject /Basic.Nack) 且 requeue = false
  • 消息 TTL 过期。
  • 队列满了,无法再添加。
Channel channel= RabbitMqUtils.getChannel();
//声明死信和普通交换机类型为direct
channel.exchangeDeclare(NORMAL_EXCHANGE,BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT)
//声明普通队列
Map<String,Object> arguments = new HashMap<>();
//过期时间10s=10000ms
arguments.put("x-message-tt1,100000):
//正常队列设置死信交换机
arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
//设置死信RoutingKey
arguments.put("x-dead-letter-routing-key","lisi");
channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);
1
2
3
4
5
6
7
8
9
10
11
12
13

# 2.什么是延迟队列?

延迟队列指的是存储对应的延迟消息,消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。

RabbitMQ 本身是没有延迟队列的,要实现延迟消息,一般有两种方式:

  1. 通过 RabbitMQ 本身队列的特性来实现,需要使用 RabbitMQ 的死信交换机(Exchange)和消息的存活时间 TTL(Time To Live)。
  2. 在 RabbitMQ 3.5.7 及以上的版本提供了一个插件(rabbitmq-delayed-message-exchange)来实现延迟队列功能。同时,插件依赖 Erlang/OPT 18.0 及以上。

也就是说,AMQP 协议以及 RabbitMQ 本身没有直接支持延迟队列的功能,但是可以通过 TTLDLX 模拟出延迟队列的功能。

# 3.延迟队列使用场景?

  • 订单在十分钟之内未支付则自动取消
  • 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
  • 用户注册成功后,如果三天内没有登陆则进行短信提醒。
  • 用户发起退款,如果三天内没有得到处理则通知相关运营人员。
  • 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议

# 4.什么是镜像队列?

默认情况下,RabbitMQ 集群中的队列只会存储在某一个节点上,就是队列声明的那个节点上。当访问集群中的其他节点时,会把请求转发给这个节点来进行处理。当这个节点故障时,集群中的这个队列就表现为不可用。队列可以在多个节点中复制镜像以保障可用性,称之为镜像队列。

每一个镜像队列由一个 master 和若干个 slave 组成。队列的 master 通常存储在集群的主节点上,没个队列有自己的主节点,镜像队列的所有操作都会首先在 mastEr 上执行然后广播给其他镜像。包括消息入队,推送给消费者、和消费确认等。

生产者发送的消息会在所有的镜像中存储一份副本,消费者不论连接哪个节点最终都会在 master 上操作,一旦 master 确认消费(ack)以后,镜像队列会丢弃这条消息。因此镜像队列虽然增加了可用性(存在多个可用副本),但是多个节点间并没有分摊负载,因为所有节点都会处理全量的消息。

如果镜像队列的 master 宕机了,最老的镜像将会晋升为 新的 master。未同步的镜像也可以晋升为 master,取决于队列的镜像参数。

# 5.优先级队列?

RabbitMQ 在 3.5.0 版本的时候提供了优先级队列的实现。客户端通过配置队列的 x-max-priority 参数的方式设置一个队列支持的最大优先级(但是不能使用策略的方式配置)以此来声明一个优先级队列。优先级最大值为 255、最小值为 0(默认值),推荐1 ~ 10。生产者可以通过设置 Basic.Properties 的 priority 属性设置消息的优先级(值越大,优先级越高)。优先级越高,越先被消费者消费,但是带来的内存、磁盘、CPU 开销越高。

如果消费者的消费速度远低于生产者生产消息的速度、Broker 有消息积压的情况下,对消息设置的优先级才有意义。

生产者


public class TestProducer {

  public static void main(String[] args) throws Exception {
      ConnectionFactory connectionFactory = new ConnectionFactory();
      connectionFactory.setUsername("admin");
      connectionFactory.setPassword("admin");
      connectionFactory.setPort(5672);
      connectionFactory.setHost("127.0.0.1");
      connectionFactory.setVirtualHost("/");
      Connection connection = connectionFactory.newConnection();
      Channel channel = connection.createChannel();
      for (int i = 0; i < 10; i++) {
          AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder().priority(i).build();
          channel.basicPublish("test-direct-exchange", "test-routing-key", basicProperties, ("hello " + i).getBytes());
      }
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

消费者

public class TestConsumer {

    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setPort(5672);
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setVirtualHost("/");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare("test-direct-exchange", BuiltinExchangeType.DIRECT, true);
        Map<String, Object> properties = new HashMap<>();
        properties.put("x-max-priority", 10);
        channel.queueDeclare("test-queue", true, false, false, properties);
        channel.queueBind("test-queue", "test-direct-exchange", "test-routing-key");
        channel.basicQos(1);
        channel.basicConsume("test-queue", false, "test-consumer-tag", new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收消息 >>> " + new String(body));
                channel.basicAck(envelope.getDeliveryTag(), true);
            }
        });
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

# 6.惰性队列?

正常情况:消息是保存在内存中 惰性队列:消息是保存在磁盘中

RabbitMQ 从 3.6.0 开始引入了惰性队列(Lazy Queue)的概念。惰性队列会尽可能的将消息存入磁盘中,而在消费者消费到相应的消息时才会被加载到内存中,它的一个重要设计目标是能够支持更长的队列,即支持更多的消息存储。当消费者由于各种各样的原因(比如消费者下线、跌机、或者由于维护而关闭等)致使长时间不能消费消息而造成堆积时,惰性队列就很必要了。

默认情况下,当生产者将消息发送到 RabbitMQ 的时候,队列中的消息会尽可能地存储在内存之中,这样可以更加快速地将消息发送给消费者。即使是持久化的消息,在被写入磁盘的同时也会在内存中驻留一份备份。当 RabbitMQ 需要释放内存的时候,会将内存中的消息换页至磁盘中,这个操作会耗费较长的时间,也会阻塞队列的操作,进而无法接收新的消息。虽然 RabbitMQ 的开发者们一直在升级相关的算法,但是效果始终不太理想,尤其是在 消息量特别大的时候。

惰性队列会将接收到的消息直接存入文件系统,而不管是持久化的或者是非持久化的,这样可以减少内存的消耗,但是会增加 I/O 的使用,如果消息是持久化的,那么这样的 I/O 操作不可避免,惰性队列和持久化的消息可谓是“最佳拍档”。注意如果惰性队列中存储的是非持久化的消息,内存的使用率会一直很稳定,但是重启之后消息一样会丢失。

队列具备两种模式:defaultlazy。默认的为 default 模式,在 3.6.0 的版本无需做任何变更。lazy 模式即为惰性队列的模式,可以通过调用 channel.queueDeclare 方法的时候在参数中设置,也可以通过 Policy 的方式设置,如果一个队列同时使用这两种方式设置,那么 Policy 的方式具备更高的优先级。如果要通过声明的方式改变已有队列的模式,那么只能先删除队列,然后再重新声明一个新的。

  Map<String, Object> args = new HashMap<String, Object>();
  args.put("x-queue-mode", "lazy");
  channel.queueDeclare("myqueue", false, false, false, args);
1
2
3

# 四.底层原理

# 1.手动应答消息

使用手动应答的好处,如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其重新排队。

相应的,使用手动应答时,需要把 autoAck 属性设置为 false,然后进行手动应答。 消息手动应答 有如下几个方法

  • A.Channel.basicAck(用于肯定确认),RabbitMQ 已知道该消息并且成功的处理消息, 可以将其丢弃了
  • B.Channel.basicNack(用于否定确认)
  • C.Channel.basicReject(用于否定确认),与 Channel.basicNack 相比少一个参数,不处理该消息了直接拒绝,可以将其丢弃了

手动应答时还有一个参数:Multiple 是否批量处理,一般选择 false ,不批量处理。

@Slf4j
public class ExerciseW2 {
  private static final String QUEUE_NAME = "task_queue";

  public static void main(String[] args) throws IOException, TimeoutException {
      ConnectionFactory factory = new ConnectionFactory();
      factory.setHost("192.168.121.36");
      factory.setUsername("admin");
      factory.setPassword("123");
      Connection connection = factory.newConnection();
      Channel channel = connection.createChannel();
      channel.queueDeclare(QUEUE_NAME, true, false, false, null);

      DeliverCallback deliverCallback = (consumerTag, message) -> {
          String msg = new String(message.getBody(), "UTF-8");
          log.info("W2接收到消息:" + msg);
          try {
              Thread.sleep(2000);
          } catch (InterruptedException e) {
              e.printStackTrace();
          }
          //手动应答消息
          //1st param message tag   2.multiple: true 批量应答  false:不批量应答,实际开发中
          //使用false
          channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
      };
      CancelCallback cancelCallback = consumerTag -> {
          log.info("W2取消消息发送" + consumerTag);
      };
      channel.basicConsume(QUEUE_NAME, false, deliverCallback, cancelCallback);

  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

# 2.自动应答?

image-20240126140410714

这个属性为 trueRabbitMQ 就会开启自动应答。但是使用自动应答时需要在高吞吐量和数据传输安全性方面做权衡,因为这种模式如果消息在接收到之前,消费者那边出现连接或者 channel 关闭,那么消息就丢失了,也有可能因为没有对传递的消息数量进行限制,导致消息积压,有可能把内存耗尽。

# 3.RabbtiMQ 的持久化?

RabbtiMQ 的持久化分为队列持久化、消息持久化和交换器持久化。不管是持久化的消息还是非持久化的消息都可以被写入到磁盘。 持久化消息会同时写入磁盘和内存(加快读取速度),非持久化消息会在内存不够用时,将消息写入磁盘(一般重启之后就没有了)。

  • 队列持久化
  • 消息持久化
  • 交换器持久化

# 4.队列的持久化?

队列的持久化是在定义队列时的 durable 参数来决定的,当 durable 为 true 时,才代表队列会持久化。

关键的是第二个参数设置为 true,即 durable=true.

Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//第二个餐胡设置为true,代表队列持久化
channel.queueDeclare("queue.persistent.name", true, false, false, null);
1
2
3
4

queueDeclare方法定义

Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException;
1
  • queue:queue 的名称
  • durable:是否持久化
  • exclusive:排他队列,如果一个队列被声明为排他队列,该队列仅对首次申明它的连接可见,并在连接断开时自动删除。这里需要注意三点:
    • 排他队列是基于连接可见的,同一连接的不同信道是可以同时访问同一连接创建的排他队列;
    • “首次”,如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同;
    • 即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除的,这种队列适用于一个客户端发送读取消息的应用场景。
  • autoDelete:自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列

# 5.消息持久化?

如果将 queue 的持久化标识 durable 设置为 true,则代表是一个持久的队列,那么在服务重启之后,也会存在,因为服务会把持久化的 queue 存放在硬盘上,当服务重启的时候,会重新加载之前被持久化的 queue。队列是可以被持久化,但是里面的消息是否为持久化那还要看消息的持久化设置。

也就是说,重启之前那个 queue 里面还没有发出去的消息的话,重启之后那队列里面是不是还存在原来的消息,这个就要取决于发生着在发送消息时对消息的设置了。

如果要在重启后保持消息的持久化必须设置消息是持久化的标识。

MessageProperties.PERSISTENT_PLAIN
1
//通过传入MessageProperties.PERSISTENT_PLAIN就可以实现消息持久化
channel.basicPublish("exchange.persistent", "persistent", MessageProperties.PERSISTENT_TEXT_PLAIN, "persistent_test_message".getBytes());
1
2

basicPublish方法定义

void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body)
        throws IOException;
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)
        throws IOException;
1
2
3
4
5
  • exchange:表示 exchange 的名称
  • routingKey:表示 routingKeyl 的名称
  • mandatory:标志告诉服务器至少将该消息 route 到一个队列中,否则将消息返还给生产者
  • immediate:标志告诉服务器如果该消息关联的 queue 上有消费者,则马上将消息投递给它,如果所有 queue 都没有消费者,直接把消息返还给生产者,不用将消息入队列等待消费者了。
  • body: 代表发送的消息体
public BasicProperties(
            String contentType,//消息类型如:text/plain
            String contentEncoding,//编码
            Map<String,Object> headers,
            Integer deliveryMode,//持久化字段 1:nonpersistent 2:persistent
            Integer priority,//优先级
            String correlationId,
            String replyTo,//反馈队列
            String expiration,//expiration到期时间
            String messageId,
            Date timestamp,
            String type,
            String userId,
            String appId,
            String clusterId)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static final BasicProperties PERSISTENT_TEXT_PLAIN =
    new BasicProperties("text/plain",
                        null,
                        null,
                        2,//对应持久化deliveryMode
                        0, null, null, null,
                        null, null, null, null,
                        null, null);
1
2
3
4
5
6
7
8

# 6.交换机持久化?

如果不设置 exchange 的持久化对消息的可靠性来说没有什么影响,但是同样如果 exchange 不设置持久化,那么当 broker 服务重启之后 exchange 将不复存在,那么既而发送方 RabbitMQ producer 就无法正常发送消息。这里建议,同样设置 exchange 的持久化。exchange 的持久化设置也特别简单,方法如下:

durable 的字段为 true 即可

Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable) throws IOException;
Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete,
                                   Map<String, Object> arguments) throws IOException;
Exchange.DeclareOk exchangeDeclare(String exchange, String type) throws IOException;
Exchange.DeclareOk exchangeDeclare(String exchange,
                                          String type,
                                          boolean durable,
                                          boolean autoDelete,
                                          boolean internal,
                                          Map<String, Object> arguments) throws IOException;
void exchangeDeclareNoWait(String exchange,
                           String type,
                           boolean durable,
                           boolean autoDelete,
                           boolean internal,
                           Map<String, Object> arguments) throws IOException;
Exchange.DeclareOk exchangeDeclarePassive(String name) throws IOException;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//durable为true则开启持久化
Exchange.DeclareOk exchangeDeclare(String exchange,String type,boolean durable)throws IOException
1
2
  • exchange 交换器的名称
  • type 交换器的类型,常见的如 fanout direct topic
  • durable:设置是否持久 durable 设置为 true 表示持久化,反之是非持久。持久化可以将交换器存盘,在服务器重启的时候不会丢失相关信息。

# 7.不公平分发?

使用不公平分发(能者多劳),生产者和消费者都需要设置参数 channel.basicQos(1);

消费者 1:消费过程的代码

public class MessageQosConsumer1 {
    static final String QUEUE_NAME = QOS_QUEUE_NAME;

    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.19.128");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        // 设置持久化
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        channel.basicQos(1);
        Consumer callback = new DefaultConsumer(channel) {
            @SneakyThrows
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //路由的key
                String routingKey = envelope.getRoutingKey();
                //获取交换机信息
                String exchange = envelope.getExchange();
                //获取消息ID
                long deliveryTag = envelope.getDeliveryTag();
                //获取消息信息
                String message = new String(body, StandardCharsets.UTF_8);
                System.out.println("routingKey:" + routingKey +
                        ",exchange:" + exchange +
                        ",deliveryTag:" + deliveryTag +
                        ",message:" + message);

                // 模拟处理任务的时间
                // 消费者1设置为5s,消费者2设置为1s
                TimeUnit.SECONDS.sleep(5);

                // deliveryTag是消息标记tag ,设置为不批量应答
                channel.basicAck(deliveryTag, false);
            }
        };
        // 使用手动应答
        channel.basicConsume(QUEUE_NAME, false, callback);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43

# 8.预取值?

通过设置生产者和消费者的 channel.basicQos(1) 来实现消息的不公平分发。那么这个 1 是什么含义呢?这个就是当前消费者理论上获取的消息条数

void basicQos(int prefetchCount) throws IOException;
1

本身消息的发送就是异步发送的,所以在任何时候,channel 上肯定不止只有一个消息另外来自消费者的手动确认本质上也是异步的。因此这里就存在一个未确认的消息缓冲区,因此希望开发人员能限制此缓冲区的大小,以避免缓冲区里面无限制的未确认消息问题。这个时候就可以通过使用 basic.qos 方法设置“预取计数”值来完成的。该值定义通道上允许的未确认消息的最大数量。一旦数量达到配置的数量,RabbitMQ 将停止在通道上传递更多消息,除非至少有一个未处理的消息被确认,例如,假设在通道上有未确认的消息 5、6、7,8,并且通道的预取计数设置为 4,此时 RabbitMQ 将不会在该通道上再传递任何消息,除非至少有一个未应答的消息被 ack。比方说 tag=6 这个消息刚刚被确认 ACK,RabbitMQ 将会感知这个情况到并再发送一条消息。消息应答和 QoS 预取值对用户吞吐量有重大影响。通常,增加预取将提高向消费者传递消息的速度。虽然自动应答传输消息速率是最佳的,但是,在这种情况下已传递但尚未处理的消息的数量也会增加,从而增加了消费者的 RAM 消耗(随机存取存储器)应该小心使用具有无限预处理的自动确认模式或手动确认模式,消费者消费了大量的消息如果没有确认的话,会导致消费者连接节点的内存消耗变大,所以找到合适的预取值是一个反复试验的过程,不同的负载该值取值也不同 100 到 300 范围内的值通常可提供最佳的吞吐量,并且不会给消费者带来太大的风险。预取值为 1 是最保守的。当然这将使吞吐量变得很低,特别是消费者连接延迟很严重的情况下,特别是在消费者连接等待时间较长的环境中。对于大多数应用来说,稍微高一点的值将是最佳的。

# 9.发布确认原理?

生产者将信道 Q 设置为 confirm 模式,一旦信道进入 confirm 模式,所有再该信道上面发布的消息都会被指派一个唯一的 ID(从 1 开始),一旦消息被投递到所有匹配的队列后,broker 就会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的地的队列了,如果消息和队列是可以持久化的,那么确认消息就会将消息写入磁盘之后发出,broker 回传给生产者的确认消息中 deliver--tag 域包含了确认消息的序列号,此外 broker 也可以设置 basic,ackmultiple 域,标识到这个序列号之前所有的消息都已经得到了处理。

confirm 模式最大的好处在于,它是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时,继续发送下一条消息,当消息最终端得到确认之后,生产者应用便可以通过回调 Q 方法来处理该确认消息,如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消息,生产者应用程序同样可以再回调方法中处理该 nack 消息。

生产者将消息发送到队列中,消息写在磁盘上,达到持久化的目标,RabbitMQ 再给消息发送消息确认,才能够达到消息永不丢失的目的

发布确认默认没有开始,如果要开启需要调用 confirmSelect 方法,每当想要使用发布确认,都需要在 channel 上调用该方法

//一个连接中有多个信道
//获取信道
Channel channel = connection.createChannel();
//开启发布确认
channel.confirmSelect();
1
2
3
4
5

# 10.单个发布确认?

简单的确认方式,它是一种同步确认发布模式。意味着生产者发布一个消息后,需等待确认结果,才能够发布下一个消息。 缺点:效率低,发布慢。

//单个确认,结果耗时722ms
public static  void publishMessageIndividually() throws Exception {
    Channel channel = RabbitMqUtiles.getChannel();
    //队列声明
    String queueName = UUID.randomUUID().toString();
    channel.queueDeclare(queueName,true,false,false,null);
    //开启确认模式
    channel.confirmSelect();
    //开始时间
    long beginTime = System.currentTimeMillis();
    for (int i = 0;i< 1000 ;i++){
        String mes = i + "";
        channel.basicPublish("",queueName,null,mes.getBytes());
        //单个消息就马上进行发布确认
        boolean b = channel.waitForConfirms();
        if (b){
            System.out.println("消息发布确认成功");
        }
    }
    //结束时间
    long endTime = System.currentTimeMillis();
    System.out.println("发布消息1000条单独确认消息,用时:"+(endTime-beginTime));
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

# 11.批量发布确认?

缺点:当发生故障,导致发布出现问题时,不知道是哪个消息出现问题。并且这种方案仍然是同步的,一样阻塞消息的发布。 优点:性能比单个确认发布高一点。

//批量确认,每100条确认一次  147ms
public static  void publishMessageBatch() throws Exception {
    Channel channel = RabbitMqUtiles.getChannel();
    //队列声明
    String queueName = UUID.randomUUID().toString();
    channel.queueDeclare(queueName,true,false,false,null);
    //开启确认模式
    channel.confirmSelect();
    //开始时间
    long beginTime = System.currentTimeMillis();
    int y =  100;
    for (int i = 0;i< 1000 ;i++){
        String mes = i + "";
        channel.basicPublish("",queueName,null,mes.getBytes());
        if (i%y ==  0){
            //确认发布
            boolean b = channel.waitForConfirms();
            if (b){
                System.out.println("消息发布确认成功");
            }
        }
    }
    //结束时间
    long endTime = System.currentTimeMillis();
    System.out.println("发布消息1000条批量确认消息,用时:"+(endTime-beginTime));
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

# 12.异步发布确认?

异步确认虽然编程逻辑比其他两个复杂,但是性价比最高。它是利用回调函数,来达到消息可靠性传递,这个中间件也是通过函数来保证是否投递成功。 broker:消息的实体,包含交换机和队列等。异步通知 无论消息队列是否收到,都会对生产者进行应答。确认应答、未确认应答。

结论 :建议使用异步确认方式,单个确认耗时太久,批量确认无法判断具体未确认消息节点。

需要开启对消息的监听。

//添加异步确认,消息监听器
//ConfirmCallback ackCallback, ConfirmCallback nackCallback
 /**
 *  ackCallback 监听哪些消息确认成功了
  *  nackCallback 监听哪些消息确认失败了
     */
 channel.addConfirmListener(ackCallback,nackCallback);
1
2
3
4
5
6
7
//确认消息,回调处理
/**
* deliveryTag  消息的标价
* multiple 是否为批量确认
*/
ConfirmCallback ackCallback = (long deliveryTag, boolean multiple)->{
  System.out.println("消息确认"+deliveryTag);
};
//未确认消息,回调处理
ConfirmCallback nackCallback = (long deliveryTag, boolean multiple) ->{
  System.out.println("消息未确认"+deliveryTag);
};
1
2
3
4
5
6
7
8
9
10
11
12
//异步确认 62ms
public static  void publishMessageAsync() throws Exception {
  Channel channel = RabbitMqUtiles.getChannel();
  //队列声明
  String queueName = UUID.randomUUID().toString();
  channel.queueDeclare(queueName,true,false,false,null);
  //开启确认模式
  channel.confirmSelect();

  //开始时间
  long beginTime = System.currentTimeMillis();

  //确认消息,回调处理
  /**
   * deliveryTag  消息的标价
   * multiple 是否为批量确认
   */
  ConfirmCallback ackCallback = (long deliveryTag, boolean multiple)->{
      System.out.println("消息确认"+deliveryTag);
  };
  //未确认消息,回调处理
  ConfirmCallback nackCallback = (long deliveryTag, boolean multiple) ->{
      System.out.println("消息未确认"+deliveryTag);
  };
  //添加异步确认,消息监听器
  //ConfirmCallback ackCallback, ConfirmCallback nackCallback
  /**
   *  ackCallback 监听哪些消息确认成功了
   *  nackCallback 监听哪些消息确认失败了
   */
  channel.addConfirmListener(ackCallback,nackCallback);
  for (int i = 0;i< 1000 ;i++){
      String mes = i + "";
      channel.basicPublish("",queueName,null,mes.getBytes());
  }
  //结束时间
  long endTime = System.currentTimeMillis();
  System.out.println("发布消息1000条批量确认消息,用时:"+(endTime-beginTime));
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

# 13.何处理未确认消息处理?

最好的解决方案,将未确认消息放在一个基于内存的,能被发布线程访问的队列,比如说 ConcurrentLinkedQueue 这个队列(并发链路式队列),confirm callbacks(确认回调)与发布线程之间进行消息传递。它可以在确认回调与发布线程之间进行消息传递。 此处是使用 ConcurrentSkipListMap,用来记录消息与消息的标识。

两步:

  1. 发送消息时,将消息和消息标记,添加到 map 中
  2. 确认成功后,将消息移除,则容器中剩下的都是未确认的消息。
//异步确认 62ms
public static  void publishMessageAsync() throws Exception {
    Channel channel = RabbitMqUtiles.getChannel();
    //队列声明
    String queueName = UUID.randomUUID().toString();
    channel.queueDeclare(queueName,true,false,false,null);
    //开启确认模式
    channel.confirmSelect();

    /**
     * 准备一个线程 安全、有序的哈希表  适用于高并发的情况下
     * 1.轻松地将序号与消息进行关联
     * 2.轻松的批量删除条目,只要给序号
     * 3.支持高并发(多线程)
      */
    ConcurrentSkipListMap<Long,String> skipListMap = new ConcurrentSkipListMap();

    //确认消息,回调处理
    /**
     * deliveryTag  消息的标价
     * multiple 是否为批量确认
     */
    ConfirmCallback ackCallback = (long deliveryTag, boolean multiple)->{
        //2.删除掉已经确认的消息
        if (multiple){
            ConcurrentNavigableMap<Long, String> confiemed = skipListMap.headMap(deliveryTag);
            confiemed.clear();
        }else {
            skipListMap.remove(deliveryTag);
        }
        System.out.println("消息确认"+deliveryTag);
    };
    //未确认消息,回调处理
    ConfirmCallback nackCallback = (long deliveryTag, boolean multiple) ->{
        String s = skipListMap.get(deliveryTag);
        System.out.println("消息未确认"+deliveryTag);
        System.out.println("消息未确认"+s);
    };
    //添加异步确认,消息监听器
    //ConfirmCallback ackCallback, ConfirmCallback nackCallback
    /**
     *  ackCallback 监听哪些消息确认成功了
     *  nackCallback 监听哪些消息确认失败了
     */
    channel.addConfirmListener(ackCallback,nackCallback);
    //开始时间
    long beginTime = System.currentTimeMillis();
    for (int i = 0;i< 1000 ;i++){
        String mes = i + "";
        channel.basicPublish("",queueName,null,mes.getBytes());
        //1.此处记录下所有要发送的消息,消息总和
        skipListMap.put(channel.getNextPublishSeqNo(), mes);
    }

    //结束时间
    long endTime = System.currentTimeMillis();
    System.out.println("发布消息1000条批量确认消息,用时:"+(endTime-beginTime));
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58

# 五.常见问题

# 1.RabbitMQ 工作模式?

  • 简单模式
  • work 工作模式
  • pub/sub 发布订阅模式
  • Routing 路由模式
  • Topic 主题模式

# 2.RabbitMQ 消息怎么传输?

由于 TCP 链接的创建和销毁开销较大,且并发数受系统资源限制,会造成性能瓶颈,所以 RabbitMQ 使用信道的方式来传输数据。信道(Channel)是生产者、消费者与 RabbitMQ 通信的渠道,信道是建立在 TCP 链接上的虚拟链接,且每条 TCP 链接上的信道数量没有限制。就是说 RabbitMQ 在一条 TCP 链接上建立成百上千个信道来达到多个线程处理,这个 TCP 被多个线程共享,每个信道在 RabbitMQ 都有唯一的 ID,保证了信道私有性,每个信道对应一个线程使用。

# 3.如何保证消息的可靠性?

消息到 MQ 的过程中搞丢,MQ 自己搞丢,MQ 到消费过程中搞丢。

  • 生产者到 RabbitMQ:事务机制和 Confirm 机制,注意:事务机制和 Confirm 机制是互斥的,两者不能共存,会导致 RabbitMQ 报错。
  • Confirm 机制:生产者发的消息到交换机,会收到成功和失败的回调,失败了需要重新发送
  • RabbitMQ 自身:持久化、集群、普通模式、镜像模式。
  • RabbitMQ 到消费者:basicAck 机制、死信队列、消息补偿机制。

# 4.如何保证消息的顺序性?

  • 拆分多个 queue(消息队列),每个 queue(消息队列) 一个 consumer(消费者),就是多一些 queue (消息队列)而已,确实是麻烦点;
  • 或者就一个 queue (消息队列)但是对应一个 consumer(消费者),然后这个 consumer(消费者)内部用内存队列做排队,然后分发给底层不同的 worker 来处理。

# 5.如何保证高可用的?

RabbitMQ 是比较有代表性的,因为是基于主从(非分布式)做高可用性的,我们就以 RabbitMQ 为例子讲解第一种 MQ 的高可用性怎么实现。RabbitMQ 有三种模式:单机模式、普通集群模式、镜像集群模式。

单机模式

Demo 级别的,一般就是你本地启动了玩玩儿的?,没人生产用单机模式。

普通集群模式

意思就是在多台机器上启动多个 RabbitMQ 实例,每个机器启动一个。你创建的 queue,只会放在一个 RabbitMQ 实例上,但是每个实例都同步 queue 的元数据(元数据可以认为是 queue 的一些配置信息,通过元数据,可以找到 queue 所在实例)。

你消费的时候,实际上如果连接到了另外一个实例,那么那个实例会从 queue 所在实例上拉取数据过来。这方案主要是提高吞吐量的,就是说让集群中多个节点来服务某个 queue 的读写操作。

镜像集群模式

这种模式,才是所谓的 RabbitMQ 的高可用模式。跟普通集群模式不一样的是,在镜像集群模式下,你创建的 queue,无论元数据还是 queue 里的消息都会存在于多个实例上,就是说,每个 RabbitMQ 节点都有这个 queue 的一个完整镜像,包含 queue 的全部数据的意思。然后每次你写消息到 queue 的时候,都会自动把消息同步到多个实例的 queue 上。RabbitMQ 有很好的管理控制台,就是在后台新增一个策略,这个策略是镜像集群模式的策略,指定的时候是可以要求数据同步到所有节点的,也可以要求同步到指定数量的节点,再次创建 queue 的时候,应用这个策略,就会自动将数据同步到其他的节点上去了。

这样的好处在于,你任何一个机器宕机了,没事儿,其它机器(节点)还包含了这个 queue 的完整数据,别的 consumer 都可以到其它节点上去消费数据。坏处在于,第一,这个性能开销也太大了吧,消息需要同步到所有机器上,导致网络带宽压力和消耗很重!RabbitMQ 一个 queue 的数据都是放在一个节点里的,镜像集群下,也是每个节点都放这个 queue 的完整数据。

# 6.如何解决消息积压问题?

先分析积压的原因,再给出解决方案.

临时紧急扩容。先修复 consumer 的问题,确保其恢复消费速度,然后将现有 cnosumer 都停掉。新建一个 topicpartition 是原来的 10 倍,临时建立好原先 10 倍的 queue 数量。然后写一个临时的分发数据的 consumer 程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的 10 倍数量的 queue。接着临时征用 10 倍的机器来部署 consumer,每一批 consumer 消费一个临时 queue 的数据。这种做法相当于是临时将 queue 资源和 consumer 资源扩大 10 倍,以正常的 10 倍速度来消费数据。等快速消费完积压数据之后,得恢复原先部署的架构,重新用原先的 consumer 机器来消费消息。

# 7.延时以及过期失效问题?

RabbtiMQ 是可以设置过期时间的,也就是 TTL。如果消息在 queue 中积压超过一定的时间就会被 RabbitMQ 给清理掉,这个数据就没了。那这就是第二个坑了。这就不是说数据会大量积压在 MQ 里,而是大量的数据会直接搞丢。我们可以采取一个方案,就是批量重导,这个我们之前线上也有类似的场景干过。就是大量积压的时候,我们当时就直接丢弃数据了,然后等过了高峰期以后,比如大家一起喝咖啡熬夜到晚上 12 点以后,用户都睡觉了。这个时候我们就开始写程序,将丢失的那批数据,写个临时程序,一点一点的查出来,然后重新灌入 mq 里面去,把白天丢的数据给他补回来。也只能是这样了。假设 1 万个订单积压在 mq 里面,没有处理,其中 1000 个订单都丢了,你只能手动写程序把那 1000 个订单给查出来,手动发到 mq 里去再补一次。

# 8.消息重复消费?

消费者在消费 MQ 中的消息时,MQ 已把消息发送给消费者,消费者在给 MQ 返回时网络中断,故 MQ 未收到确认信息,该条消息会重新发给其他的消费者,或者在网络重连后再次发送给该消费者,但实际上该消费者已成功消费了该条消息,造成消费者消费了重复的消息。

解决思路:``MQ 消费者的幂等性的解决一般使用全局 ID 或者写个唯一标识比如时间戳或者 UUID 或者订单消费者消费 MQ 中的消息也可利用 MQ 的该 id 来判断,或者可按自己的规则生成一个全局唯一ID,每次消费消息时用该 id 先判断该消息是否已消费过。

上次更新: 10/29/2024, 10:27:50 AM