# 一.基础知识

# 1.为什么要用 MQ

消息队列是一种“先进先出”的数据结构

image-20230813183402738

其应用场景主要包含以下 3 个方面

  • 应用解耦
  • 流量削峰
  • 数据分发

应用解耦:

系统的耦合性越高,容错性就越低。以电商应用为例,用户创建订单后,如果耦合调用库存系统、物流系统、支付系统,任何一个子系统出了故障或者因为升级等原因暂时不可用,都会造成下单操作异常,影响用户使用体验。

使用消息队列解耦合,系统的耦合性就会提高了。比如物流系统发生故障,需要几分钟才能来修复,在这段时间内,物流系统要处理的数据被缓存到消息队列中,用户的下单操作正常完成。当物流系统回复后,补充处理存在消息队列中的订单消息即可,终端系统感知不到物流系统发生过几分钟故障。

流量削峰:

应用系统如果遇到系统请求流量的瞬间猛增,有可能会将系统压垮。有了消息队列可以将大量请求缓存起来,分散到很长一段时间处理,这样可以大大提到系统的稳定性和用户体验。

一般情况,为了保证系统的稳定性,如果系统负载超过阈值,就会阻止用户请求,这会影响用户体验,而如果使用消息队列将请求缓存起来,等待系统处理完毕后通知用户下单完毕,这样总不能下单体验要好。

业务系统正常时段的 QPS 如果是 1000,流量最高峰是 10000,为了应对流量高峰配置高性能的服务器显然不划算,这时可以使用消息队列对峰值流量削峰

数据分发:

通过消息队列可以让数据在多个系统更加之间进行流通。数据的产生方不需要关心谁来使用数据,只需要将数据发送到消息队列,数据使用方直接在消息队列中直接获取数据即可

# 2.MQ 的优点和缺点

优点:

  • 应用解耦
  • 流量削峰
  • 数据分发

缺点包含以下几点:

  • 系统可用性降低:系统引入的外部依赖越多,系统稳定性越差。一旦 MQ 宕机,就会对业务造成影响。

  • 系统复杂度提高:MQ 的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过 MQ 进行异步调用。

  • 一致性问题:A 系统处理完业务,通过 MQ 给 B、C、D 三个系统发消息数据,如果 B 系统、C 系统处理成功,D 系统处理失败。

# 3.什么是 RocketMQ?

Apache RocketMQ 是一款典型的分布式架构下的中间件产品,使用异步通信方式和发布订阅的消息传输模型。 Apache RocketMQ 产品具备异步通信的优势,系统拓扑简单、上下游耦合较弱,主要应用于异步解耦,流量削峰填谷等场景。消息队列的本质在于消息的发送、存储和接收。

# 4.RocketMQ 组成

Apache RocketMQ 中消息的生命周期主要分为消息生产、消息存储、消息消费这三部分。

生产者生产消息并发送至 Apache RocketMQ 服务端,消息被存储在服务端的主题中,消费者通过订阅主题消费消息。

领域模型

# 5.为什么选择 RocketMQ?

  • ActiveMQ IO 模块达到了一个瓶颈
  • kafka 不能满足低延迟和高可靠性

在阿里孕育 RocketMQ 的雏形时期,将其用于异步通信、搜索、社交网络活动流、数据管道,贸易流程中。随着贸易业务吞吐量的上升,源自消息传递集群的压力也变得紧迫。

根据研究,随着队列和虚拟主题使用的增加,ActiveMQ IO 模块达到了一个瓶颈。尽力通过节流、断路器或降级来解决这个问题,但效果并不理想。于是尝试了流行的消息传递解决方案 Kafka。不幸的是,Kafka 不能满足要求,尤其表现在低延迟和高可靠性方面。在这种情况下,我们决定发明一个新的消息传递引擎来处理更广泛的消息用例,覆盖从传统的 pub/sub 场景到高容量的实时零误差的交易系统。

Apache RocketMQ 自诞生以来,因其架构简单、业务功能丰富、具备极强可扩展性等特点被众多企业开发者以及云厂商广泛采用。历经十余年的大规模场景打磨,RocketMQ 已经成为业内共识的金融级可靠业务消息首选方案,被广泛应用于互联网、大数据、移动互联网、物联网等领域的业务场景。

# 6.架构图

  1. NameServer:在 MQ 集群中做的是做命名服务,更新和路由发现 broker 服务;
  2. Broker-Master:broker 消息主机服务器;
  3. Broker-Slave:broker 消息从机服务器;
  4. Producer:消息生产者;
  5. Consumer:消息消费者;

image-20230104170922612

RocketMQ 中的 broker 和 nameserver 是通过 netty 进行长链接的

image-20221223192800236

# 二.RocketMQ 概念

# 1.说说主题 Topic?

主题下面包含

  • 主题名称
  • 队列列表
  • 消息类型

Apache RocketMQ 中消息传输和存储的顶层容器,用于标识同一类业务逻辑的消息。主题通过 TopicName 来做唯一标识和区分。主题是一个逻辑概念,并不是实际的消息容器。

Apache RocketMQ 消息传输和存储的分组容器,主题内部由多个队列组成,消息的存储和水平扩展实际是通过主题内的队列实现的。

主题的作用

  • 定义数据的分类隔离: 在 Apache RocketMQ 的方案设计中,建议将不同业务类型的数据拆分到不同的主题中管理,通过主题实现存储的隔离性和订阅隔离性。
  • 定义数据的身份和权限: Apache RocketMQ 的消息本身是匿名无身份的,同一分类的消息使用相同的主题来做身份识别和权限管理。

# 2.消息队列

消息队列 MessageQueue:队列是 Apache RocketMQ 中消息存储和传输的实际容器,也是消息的最小存储单元。 Apache RocketMQ 的所有主题都是由多个队列组成,以此实现队列数量的水平拆分和队列内部的流式存储。队列通过 QueueId 来做唯一标识和区分。

队列的主要作用如下:

  • 存储顺序性

    队列天然具备顺序性,即消息按照进入队列的顺序写入存储,同一队列间的消息天然存在顺序关系,队列头部为最早写入的消息,队列尾部为最新写入的消息。消息在队列中的位置和消息之间的顺序通过位点(Offset)进行标记管理。

  • 流式操作语义

    Apache RocketMQ 基于队列的存储模型可确保消息从任意位点读取任意数量的消息,以此实现类似聚合读取、回溯读取等特性,这些特性是 RabbitMQ、ActiveMQ 等非队列存储模型不具备的。

image-20230813184607417

Apache RocketMQ 队列模型和 Kafka 的分区(Partition)模型类似。在 Apache RocketMQ 消息收发模型中,队列属于主题的一部分,虽然所有的消息资源以主题粒度管理,但实际的操作实现是面向队列。例如,生产者指定某个主题,向主题内发送消息,但实际消息发送到该主题下的某个队列中。

Apache RocketMQ 中通过修改队列数量,以此实现横向的水平扩容和缩容。

# 3.队列读写权限

  • 定义:当前队列是否可以读写数据。

  • 取值:由服务端定义,枚举值如下

    • 6:读写状态,当前队列允许读取消息和写入消息。
    • 4:只读状态,当前队列只允许读取消息,不允许写入消息。
    • 2:只写状态,当前队列只允许写入消息,不允许读取消息。
    • 0:不可读写状态,当前队列不允许读取消息和写入消息。
  • 约束:队列的读写权限属于运维侧操作,不建议频繁修改。

# 4.消息 Message

消息是 Apache RocketMQ 中的最小数据传输单元。生产者将业务数据的负载和拓展属性包装成消息发送到服务端,服务端按照相关语义将消息投递到消费端进行消费。

Apache RocketMQ 的消息模型具备如下特点:

  • 消息不可变性

    消息本质上是已经产生并确定的事件,一旦产生后,消息的内容不会发生改变。即使经过传输链路的控制也不会发生变化,消费端获取的消息都是只读消息视图。

  • 消息持久化

    Apache RocketMQ 会默认对消息进行持久化,即将接收到的消息存储到 Apache RocketMQ 服务端的存储文件中,保证消息的可回溯性和系统故障场景下的可恢复性。

消息的内部属性

内部属性 描述
主题名称 消息所属主题的名称
消息类型 消息的分类或类型
消息队列 消息所在的队列或通道
消息位点 消息在队列中的位置
消息 ID 每条消息的唯一标识符
索引 Key 列表 用于消息索引的关键字列表
过滤标签 Tag 附加在消息上的标签
定时时间 消息发送的预定时间
发送时间 消息实际发送的时间
消息保存时间戳 消息被保存的时间戳
消费重试次数 消息消费失败后的重试次数

# 5.消息类型

消息类型 MessageType:Apache RocketMQ 中按照消息传输特性的不同而定义的分类,用于类型管理和安全校验。 Apache RocketMQ 支持的消息类型有普通消息、顺序消息、事务消息和定时/延时消息。

消息类型

  • 定义:主题所支持的消息类型。
  • 取值:创建主题时选择消息类型。

Apache RocketMQ 支持的主题类型如下:

主题类型 描述
Normal 普通消息 (opens new window):消息本身无特殊语义,消息之间也没有任何关联。
FIFO 顺序消息 (opens new window):通过消息分组 MessageGroup 标记一组特定消息的先后顺序,保证消息的投递顺序严格按照消息发送时的顺序。
Delay 定时/延时消息 (opens new window):指定延时时间,消息在延时间隔后才对消费者可见。
Transaction 事务消息 (opens new window):支持分布式事务消息,保障应用数据库更新和消息调用的事务一致性。

# 6.消息视图

消息视图 MessageView

消息视图是 Apache RocketMQ 面向开发视角提供的一种消息只读接口。通过消息视图可以读取消息内部的多个属性和负载信息,但是不能对消息本身做任何修改。

# 7.消息标签

消息标签 MessageTag

消息标签是 Apache RocketMQ 提供的细粒度消息分类属性,可以在主题层级之下做消息类型的细分。消费者通过订阅特定的标签来实现细粒度过滤

# 8.消息位点

消息位点(MessageQueueOffset):消息是按到达 Apache RocketMQ 服务端的先后顺序存储在指定主题的多个队列中,每条消息在队列中都有一个唯一的 Long 类型坐标,这个坐标被定义为消息位点。

# 9.消费位点

消费位点(ConsumerOffset):一条消息被某个消费者消费完成后不会立即从队列中删除,Apache RocketMQ 会基于每个消费者分组记录消费过的最新一条消息的位点,即消费位点。

# 10.消息索引

消息索引 MessageKey

消息索引是 Apache RocketMQ 提供的面向消息的索引属性。通过设置的消息索引可以快速查找到对应的消息内容。

# 11.生产者

生产者 Producer 是 Apache RocketMQ 系统中用来构建并传输消息到服务端的运行实体。生产者通常被集成在业务系统中,将业务消息按照要求封装成消息并发送至服务端。

在消息生产者中,可以定义如下传输行为:

  • 发送方式:生产者可通过 API 接口设置消息发送的方式。Apache RocketMQ 支持同步传输和异步传输。
  • 批量发送:生产者可通过 API 接口设置消息批量传输的方式。例如,批量发送的消息条数或消息大小。
  • 事务行为:Apache RocketMQ 支持事务消息,对于事务消息需要生产者配合进行事务检查等行为保障事务的最终一致性。

# 12.消费者

消费者 Consumer 是 Apache RocketMQ 中用来接收并处理消息的运行实体。消费者通常被集成在业务系统中,从服务端获取消息,并将消息转化成业务可理解的信息,供业务逻辑处理。

在消息消费端,可以定义如下传输行为:

  • 消费者身份:消费者必须关联一个指定的消费者分组,以获取分组内统一定义的行为配置和消费状态。
  • 消费者类型:Apache RocketMQ 面向不同的开发场景提供了多样的消费者类型,包括 PushConsumer 类型、SimpleConsumer 类型、PullConsumer 类型(仅推荐流处理场景使用)等。
  • 消费者本地运行配置:消费者根据不同的消费者类型,控制消费者客户端本地的运行配置。例如消费者客户端的线程数,消费并发度等,实现不同的传输效果。

# 13.消费者分组

消费者分组 ConsumerGroup

消费者分组是 Apache RocketMQ 系统中承载多个消费行为一致的消费者的负载均衡分组。和消费者不同,消费者分组并不是运行实体,而是一个逻辑资源。在 Apache RocketMQ 中,通过消费者分组内初始化多个消费者实现消费性能的水平扩展以及高可用容灾。

image-20231219111820493

# 14.订阅关系

订阅关系 Subscription 是 Apache RocketMQ 系统中消费者获取消息、处理消息的规则和状态配置。订阅关系由消费者分组动态注册到服务端系统,并在后续的消息传输中按照订阅关系定义的过滤规则进行消息匹配和消费进度维护。

通过配置订阅关系,可控制如下传输行为:

  • 消息过滤规则:用于控制消费者在消费消息时,选择主题内的哪些消息进行消费,设置消费过滤规则可以高效地过滤消费者需要的消息集合,灵活根据不同的业务场景设置不同的消息接收范围。
  • 消费状态:Apache RocketMQ 服务端默认提供订阅关系持久化的能力,即消费者分组在服务端注册订阅关系后,当消费者离线并再次上线后,可以获取离线前的消费进度并继续消费。

# 15.事务检查器

事务检查器(TransactionChecker

Apache RocketMQ 中生产者用来执行本地事务检查和异常事务恢复的监听器。事务检查器应该通过业务侧数据的状态来检查和判断事务消息的状态。

# 16.事务状态

事务状态(TransactionResolution

Apache RocketMQ 中事务消息发送过程中,事务提交的状态标识,服务端通过事务状态控制事务消息是否应该提交和投递。事务状态包括事务提交、事务回滚和事务未决。

# 17.分布式通信方式

分布式系统架构思想下,将复杂系统拆分为多个独立的子模块,例如微服务模块。此时就需要考虑子模块间的远程通信,典型的通信模式分为以下两种,

  • 一种是同步的 RPC 远程调用;
  • 一种是基于中间件代理的异步通信方式。

异步通信的优势如下:

  • 系统拓扑简单由于调用方和被调用方统一和中间代理通信,系统是星型结构,易于维护和管理。

  • 上下游耦合性弱上下游系统之间弱耦合,结构更灵活,由中间代理负责缓冲和异步恢复。 上下游系统间可以独立升级和变更,不会互相影响。

  • 容量削峰填谷基于消息的中间代理往往具备很强的流量缓冲和整形能力,业务流量高峰到来时不会击垮下游。

# 18.点对点模型

点对点模型也叫队列模型,具有如下特点:

  • 消费匿名:消息上下游沟通的唯一的身份就是队列,下游消费者从队列获取消息无法申明独立身份。
  • 一对一通信:基于消费匿名特点,下游消费者即使有多个,但都没有自己独立的身份,因此共享队列中的消息,每一条消息都只会被唯一一个消费者处理。因此点对点模型只能实现一对一通信。

image-20230813185638074

# 19.发布订阅模型

发布订阅模型具有如下特点:

  • 消费独立:相比队列模型的匿名消费方式,发布订阅模型中消费方都会具备的身份,一般叫做订阅组(订阅关系),不同订阅组之间相互独立不会相互影响。
  • 一对多通信:基于独立身份的设计,同一个主题内的消息可以被多个订阅组处理,每个订阅组都可以拿到全量消息。因此发布订阅模型可以实现一对多通信。

# 20.NameServer 作用

相当于 kafka 中的 zookeeper,NameServer 是用作 RocketMQ 的路由中心来使用的。

  • NameServer 互相独立,彼此没有通信关系,单台 NameServer 挂掉,不影响其他 NameServer。
  • NameServer 不去连接别的机器,不主动推消息。
  • 单个 Broker(Master、Slave)与所有 NameServer 进行定时注册,以便告知 NameServer 自己还活着。 Broker 每隔 30 秒向所有 NameServer 发送心跳,心跳包含了自身的 topic 配置信息。NameServer 每隔 10 秒,扫描所有还存活的 broker 连接,如果某个连接的最后更新时间与当前时间差值超过 2 分钟,则断开此连接,NameServer 也会断开此 broker 下所有与 slave 的连接。同时更新 topic 与队列的对应关系,但不通知生产者和消费者。Broker slave 同步或者异步从 Broker master上拷贝数据。
  • Consumer 随机与一个 NameServer 建立长连接,如果该 NameServer 断开,则从 NameServer 列表中查找下一个进行连接。 Consumer 主要从 NameServer 中根据 Topic 查询 Broker 的地址,查到就会缓存到客户端,并向提供 Topic 服务的 Master、Slave 建立长连接,且定时向 Master、Slave 发送心跳。如果 Broker 宕机,则 NameServer 会将其剔除,而 Consumer 端的定时任务 MQClientInstance.this.updateTopicRouteInfoFromNameServer 每 30 秒执行一次,将 Topic 对应的 Broker 地址拉取下来,此地址只有 Slave 地址了,此时 Consumer 从 Slave 上消费。消费者与 Master 和 Slave 都建有连接,在不同场景有不同的消费规则。
  • Producer 随机与一个 NameServer 建立长连接,每隔 30 秒(此处时间可配置)从 NameServer 获取 Topic 的最新队列情况,如果某个 Broker Master 宕机,Producer 最多 30 秒才能感知,在这个期间,发往该 broker master 的消息失败。Producer 向提供 Topic 服务的 Master 建立长连接,且定时向 Master 发送心跳。生产者与所有的 master 连接,但不能向 slave 写入。客户端是先从 NameServer 寻址的,得到可用 Broker 的 IP 和端口信息,然后据此信息连接 broker。

综上所述,NameServer 在 RocketMQ 中的作用:

  1. NameServer 用来保存活跃的 broker 列表,包括 Master 和 Slave 。
  2. NameServer 用来保存所有 topic 和该 topic 所有队列的列表。
  3. NameServer 用来保存所有 broker 的 Filter 列表。
  4. 命名服务器为客户端,包括生产者消费者命令行客户端提供最新的路由信息。

# 三.特性特征

# 1.普通消息

普通消息一般应用于微服务解耦、事件驱动、数据集成等场景,这些场景大多数要求数据传输通道具有可靠传输的能力,且对消息的处理时机、处理顺序没有特别要求。

  • 微服务异步解耦
  • 数据集成传输

image-20230813190021342

image-20230813190049852

普通消息生命周期

  • 初始化:消息被生产者构建并完成初始化,待发送到服务端的状态。
  • 待消费:消息被发送到服务端,对消费者可见,等待消费者消费的状态。
  • 消费中:消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。 此时服务端会等待消费者完成消费并提交消费结果,如果一定时间后没有收到消费者的响应,Apache RocketMQ 会对消息进行重试处理。
  • 消费提交:消费者完成消费处理,并向服务端提交消费结果,服务端标记当前消息已经被处理(包括消费成功和失败)。 Apache RocketMQ 默认支持保留所有消息,此时消息数据并不会立即被删除,只是逻辑标记已消费。消息在保存时间到期或存储空间不足被删除前,消费者仍然可以回溯消息重新消费。
  • 消息删除:Apache RocketMQ 按照消息保存机制滚动清理最早的消息数据,将消息从物理文件中删除。

# 2.定时/延时消息

在分布式定时调度触发、任务超时处理等场景,需要实现精准、可靠的定时事件触发。使用 Apache RocketMQ 的定时消息可以简化定时调度任务的开发逻辑,实现高性能、可扩展、高可靠的定时触发能力。

定时消息是 Apache RocketMQ 提供的一种高级消息类型,消息被发送至服务端后,在指定时间后才能被消费者消费。通过设置一定的定时时间可以实现分布式场景的延时调度触发效果。

定时时间设置原则

  • Apache RocketMQ 定时消息设置的定时时间是一个预期触发的系统时间戳,延时时间也需要转换成当前系统时间后的某一个时间戳,而不是一段延时时长。
  • 定时时间的格式为毫秒级的 Unix 时间戳,您需要将要设置的时刻转换成时间戳形式。
  • 定时时间必须设置在定时时长范围内,超过范围则定时不生效,服务端会立即投递消息。
  • 定时时长最大值默认为 24 小时,不支持自定义修改
  • 定时时间必须设置为当前时间之后,若设置到当前时间之前,则定时不生效,服务端会立即投递消息。

多了一个状态定时中

定时中:消息被发送到服务端,和普通消息不同的是,服务端不会直接构建消息索引,而是会将定时消息单独存储在定时存储系统中,等待定时时刻到达。

# 3.顺序消息

顺序消息是 Apache RocketMQ 提供的一种高级消息类型,支持消费者按照发送消息的先后顺序获取消息,从而实现业务场景中的顺序处理。 相比其他类型消息,顺序消息在发送、存储和投递的处理过程中,更多强调多条消息间的先后顺序关系。

Apache RocketMQ 顺序消息的顺序关系通过消息组(MessageGroup)判定和识别,发送顺序消息时需要为每条消息设置归属的消息组,相同消息组的多条消息之间遵循先进先出的顺序关系,不同消息组、无消息组的消息之间不涉及顺序性。

基于消息组的顺序判定逻辑,支持按照业务逻辑做细粒度拆分,可以在满足业务局部顺序的前提下提高系统的并行度和吞吐能力。

如何保证消息的顺序性

Apache RocketMQ 的消息的顺序性分为两部分,生产顺序性和消费顺序性。

生产顺序性:

Apache RocketMQ 通过生产者和服务端的协议保障单个生产者串行地发送消息,并按序存储和持久化。

如需保证消息生产的顺序性,则必须满足以下条件:

  • 单一生产者:消息生产的顺序性仅支持单一生产者,不同生产者分布在不同的系统,即使设置相同的消息组,不同生产者之间产生的消息也无法判定其先后顺序。
  • 串行发送:Apache RocketMQ 生产者客户端支持多线程安全访问,但如果生产者使用多线程并行发送,则不同线程间产生的消息将无法判定其先后顺序。

满足以上条件的生产者,将顺序消息发送至 Apache RocketMQ 后,会保证设置了同一消息组的消息,按照发送顺序存储在同一队列中。服务端顺序存储逻辑如下:

  • 相同消息组的消息按照先后顺序被存储在同一个队列。
  • 不同消息组的消息可以混合在同一个队列中,且不保证连续。

消费顺序性:

Apache RocketMQ 通过消费者和服务端的协议保障消息消费严格按照存储的先后顺序来处理。

如需保证消息消费的顺序性,则必须满足以下条件:

  • 投递顺序:Apache RocketMQ 通过客户端 SDK 和服务端通信协议保障消息按照服务端存储顺序投递,但业务方消费消息时需要严格按照接收---处理---应答的语义处理消息,避免因异步处理导致消息乱序。
  • 有限重试:Apache RocketMQ 顺序消息投递仅在重试次数限定范围内,即一条消息如果一直重试失败,超过最大重试次数后将不再重试,跳过这条消息消费,不会一直阻塞后续消息处理。

对于需要严格保证消费顺序的场景,请务设置合理的重试次数,避免参数不合理导致消息乱序。

image-20221223185705208

# 4.事务消息

事务消息是 Apache RocketMQ 提供的一种高级消息类型,支持在分布式场景下保障消息生产和本地事务的最终一致性。

事务消息处理流程

事务消息

  1. 生产者将消息发送至 Apache RocketMQ 服务端。
  2. Apache RocketMQ 服务端将消息持久化成功之后,向生产者返回 Ack 确认消息已经发送成功,此时消息被标记为"暂不能投递",这种状态下的消息即为半事务消息
  3. 生产者开始执行本地事务逻辑。
  4. 生产者根据本地事务执行结果向服务端提交二次确认结果Commit 或是 Rollback),服务端收到确认结果后处理逻辑如下:
    • 二次确认结果为 Commit:服务端将半事务消息标记为可投递,并投递给消费者。
    • 二次确认结果为 Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。
  5. 在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为 Unknown 未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。
  6. 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
  7. 生产者根据检查到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤 4 对半事务消息进行处理。

事务消息生命周期 事务消息

  • 初始化:半事务消息被生产者构建并完成初始化,待发送到服务端的状态。
  • 事务待提交:半事务消息被发送到服务端,和普通消息不同,并不会直接被服务端持久化,而是会被单独存储到事务存储系统中,等待第二阶段本地事务返回执行结果后再提交。此时消息对下游消费者不可见。
  • 消息回滚:第二阶段如果事务执行结果明确为回滚,服务端会将半事务消息回滚,该事务消息流程终止。
  • 提交待消费:第二阶段如果事务执行结果明确为提交,服务端会将半事务消息重新存储到普通存储系统中,此时消息对下游消费者可见,等待被消费者获取并消费。
  • 消费中:消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。 此时服务端会等待消费者完成消费并提交消费结果,如果一定时间后没有收到消费者的响应,Apache RocketMQ 会对消息进行重试处理。
  • 消费提交:消费者完成消费处理,并向服务端提交消费结果,服务端标记当前消息已经被处理(包括消费成功和失败)。 Apache RocketMQ 默认支持保留所有消息,此时消息数据并不会立即被删除,只是逻辑标记已消费。消息在保存时间到期或存储空间不足被删除前,消费者仍然可以回溯消息重新消费。
  • 消息删除:Apache RocketMQ 按照消息保存机制滚动清理最早的消息数据,将消息从物理文件中删除。

# 5.重试机制

Apache RocketMQ 客户端连接服务端发起消息发送请求时,可能会因为网络故障、服务异常等原因导致调用失败。为保证消息的可靠性, Apache RocketMQ 在客户端 SDK 中内置请求重试逻辑,尝试通过重试发送达到最终调用成功的效果。

重试触发条件

触发消息发送重试机制的条件如下:

  • 客户端消息发送请求调用失败或请求超时
  • 网络异常造成连接失败或请求超时。
  • 服务端节点处于重启或下线等状态造成连接失败。
  • 服务端运行慢造成请求超时。
  • 服务端返回失败错误码
    • 系统逻辑错误:因运行逻辑不正确造成的错误。
    • 系统流控错误:因容量超限造成的流控错误。

重试间隔

  • 除服务端返回系统流控错误场景,其他触发条件触发重试后,均会立即进行重试,无等待间隔。
  • 若由于服务端返回流控错误触发重试,系统会按照指数退避策略进行延迟重试。指数退避算法通过以下参数控制重试行为:
    • INITIAL_BACKOFF: 第一次失败重试前后需等待多久,默认值:1 秒。
    • MULTIPLIER :指数退避因子,即退避倍率,默认值:1.6。
    • JITTER :随机抖动因子,默认值:0.2。
    • MAX_BACKOFF :等待间隔时间上限,默认值:120 秒
    • MIN_CONNECT_TIMEOUT :最短重试间隔,默认值:20 秒。

# 6.消息流控机制

消息流控指的是系统容量或水位过高, Apache RocketMQ 服务端会通过快速失败返回流控错误来避免底层资源承受过高压力。

Apache RocketMQ 的消息流控触发条件如下:

  • 存储压力大:参考消费进度管理的原理机制,消费者分组的初始消费位点为当前队列的最大消费位点。若某些场景例如业务上新等需要回溯到指定时刻前开始消费,此时队列的存储压力会瞬间飙升,触发消息流控。
  • 服务端请求任务排队溢出:若消费者消费能力不足,导致队列中有大量堆积消息,当堆积消息超过一定数量后会触发消息流控,减少下游消费系统压力。

# 7.消费者分类

Apache RocketMQ 支持 PushConsumerSimpleConsumer 以及 PullConsumer 这三种类型的消费者

在实际使用场景中,PullConsumer 仅推荐在流处理框架中集成使用,大多数消息收发场景使用 PushConsumerSimpleConsumer 就可以满足需求。

生产环境中相同的 ConsumerGroup 下严禁混用 PullConsumer 和其他两种消费者,否则会导致消息消费异常。

image-20221223190340086

# 8.消息过滤

Tag 标签设置

  • Tag 由生产者发送消息时设置,每条消息允许设置一个 Tag 标签。
  • Tag 使用可见字符,建议长度不超过 128 字符。

Tag 标签过滤规则

Tag 标签过滤为精准字符串匹配,过滤规则设置格式如下:

  • 单 Tag 匹配:过滤表达式为目标 Tag。表示只有消息标签为指定目标 Tag 的消息符合匹配条件,会被发送给消费者。
  • 多 Tag 匹配:多个 Tag 之间为或的关系,不同 Tag 间使用两个竖线(||)隔开。例如,Tag1||Tag2||Tag3,表示标签为 Tag1 或 Tag2 或 Tag3 的消息都满足匹配条件,都会被发送给消费者进行消费。
  • 全部匹配:使用星号(*)作为全匹配表达式。表示主题下的所有消息都将被发送给消费者进行消费。

# 9.消费者负载均衡

如上文所述,消费组间广播消费场景下,每个消费者分组内只有一个消费者,因此不涉及消费者的负载均衡。

消费组内共享消费场景下,消费者分组内多个消费者共同分担消息,消息按照哪种逻辑分配给哪个消费者,就是由消费者负载均衡策略所决定的。

根据消费者类型的不同,消费者负载均衡策略分为以下两种模式:

  • 消息粒度负载均衡:PushConsumer 和 SimpleConsumer 默认负载策略
  • 队列粒度负载均衡:PullConsumer 默认负载策略

消息粒度负载均衡策略中,同一消费者分组内的多个消费者将按照消息粒度平均分摊主题中的所有消息,即同一个队列中的消息,可被平均分配给多个消费者共同消费。

队列粒度负载均衡策略中,同一消费者分组内的多个消费者将按照队列粒度消费消息,即每个队列仅被一个消费者消费。

默认使用 Round Robin 算法来选择下一个消费者来处理消息,也就是默认的负载均衡算法是轮询.

# 10.消费进度管理

消息位点(Offset)

参考 Apache RocketMQ 主题和队列的定义,消息是按到达服务端的先后顺序存储在指定主题的多个队列中,每条消息在队列中都有一个唯一的 Long 类型坐标,这个坐标被定义为消息位点。

任意一个消息队列在逻辑上都是无限存储,即消息位点会从 0 到 Long.MAX 无限增加。通过主题、队列和位点就可以定位任意一条消息的位置,

Apache RocketMQ 定义队列中最早一条消息的位点为最小消息位点(MinOffset);最新一条消息的位点为最大消息位点(MaxOffset)。虽然消息队列逻辑上是无限存储,但由于服务端物理节点的存储空间有限, Apache RocketMQ 会滚动删除队列中存储最早的消息。因此,消息的最小消费位点和最大消费位点会一直递增变化。

消费位点(ConsumerOffset)

Apache RocketMQ 领域模型为发布订阅模式,每个主题的队列都可以被多个消费者分组订阅。若某条消息被某个消费者消费后直接被删除,则其他订阅了该主题的消费者将无法消费该消息。

因此,Apache RocketMQ 通过消费位点管理消息的消费进度。每条消息被某个消费者消费完成后不会立即在队列中删除,Apache RocketMQ 会基于每个消费者分组维护一份消费记录,该记录指定消费者分组消费某一个队列时,消费过的最新一条消息的位点,即消费位点。

当消费者客户端离线,又再次重新上线时,会严格按照服务端保存的消费进度继续处理消息。如果服务端保存的历史位点信息已过期被删除,此时消费位点向前移动至服务端存储的最小位点。

消费位点的保存和恢复是基于 Apache RocketMQ 服务端的存储实现,和任何消费者无关。因此 Apache RocketMQ 支持跨消费者的消费进度恢复。

重置消费位点

若消费者分组的初始消费位点或当前消费位点不符合您的业务预期,您可以通过重置消费位点调整您的消费进度。

适用场景

  • 初始消费位点不符合需求:因初始消费位点为当前队列的最大消息位点,即客户端会直接从最新消息开始消费。若业务上线时需要消费部分历史消息,您可以通过重置消费位点功能消费到指定时刻前的消息。
  • 消费堆积快速清理:当下游消费系统性能不足或消费速度小于生产速度时,会产生大量堆积消息。若这部分堆积消息可以丢弃,您可以通过重置消费位点快速将消费位点更新到指定位置,绕过这部分堆积的消息,减少下游处理压力。
  • 业务回溯,纠正处理:由于业务消费逻辑出现异常,消息被错误处理。若您希望重新消费这些已被处理的消息,可以通过重置消费位点快速将消费位点更新到历史指定位置,实现消费回溯。

# 11.消息存储

Apache RocketMQ 使用存储时长作为消息存储的依据,即每个节点对外承诺消息的存储时长。在存储时长范围内的消息都会被保留,无论消息是否被消费;超过时长限制的消息则会被清理掉。

消息存储机制主要定义以下关键问题:

  • 消息存储管理粒度:Apache RocketMQ 按存储节点管理消息的存储时长,并不是按照主题或队列粒度来管理。
  • 消息存储判断依据:消息存储按照存储时间作为判断依据,相对于消息数量、消息大小等条件,使用存储时间作为判断依据,更利于业务方对消息数据的价值进行评估。
  • 消息存储和是否消费状态有关:Apache RocketMQ 的消息存储是按照消息的生产时间计算,和消息是否被消费无关。按照统一的计算策略可以有效地简化存储机制。

Apache RocketMQ 按照服务端节点粒度管理存储时长而非队列或主题,原因如下:

  • 消息存储优势权衡:Apache RocketMQ 基于统一的物理日志队列和轻量化逻辑队列的二级组织方式,管理物理数据。这种机制可以带来顺序读写、高吞吐、高性能等优势,但缺点是不支持按主题和队列单独管理。
  • 安全生产和容量保障风险要求:即使 Apache RocketMQ 按照主题或者队列独立生成存储文件,但存储层本质还是共享存储介质。单独根据主题或队列控制存储时长,这种方式看似更灵活,但实际上整个集群仍然存在容量风险,可能会导致存储时长 SLA 被打破。从安全生产角度考虑,最合理的方式是将不同存储时长的消息通过不同集群进行分离治理。

消息存储和消费状态关系说明

Apache RocketMQ 统一管理消息的存储时长,无论消息是否被消费。

当消费者不在线或消息消费异常时,会造成队列中大量消息堆积,且该现象暂时无法有效控制。若此时按照消费状态考虑将未消费的消息全部保留,则很容易导致存储空间不足,进而影响到新消息的读写速度。

根据统一地存储时长管理消息,可以帮助消费者业务清晰地判断每条消息的生命周期。只要消息在有效期内可以随时被消费,或通过重置消费位点功能使消息可被消费多次。

# 12.master 选举

Raft 算法进行 broker 的选举

  1. 3 个 Broker 启动的时候会投票给自己,然后把自己的投票情况发送给别人
  2. 第一轮选举是失败的,因为都把票投给了自己,并没有选出一个 leader
  3. 接着 3 个 Broker 睡眠随机时间,broker01 睡眠 3S,broker02 睡眠 5S,broker03 睡眠 4S
  4. broker01 先醒过来,然后投票给自己,然后把自己的投票情况发送给别人,然后 broker03 醒过来,发现了 broker01 投给了 broker01,而 broker03 此时没投票,所以 broker03 投给了 broker01。最后 broker02 醒过来,发现 broker01 投给了 broker01,broker03 也投给了 broker01,所以自己也投给了 broker01
  5. 如果 broker 获得了(机器数量/2)+1 票,那就成为 leader,只有 leader 能接受消息写,follower 只能接受 leader 同步过来的数据

RocketMQ 中,每个 Broker 都有一个 Master 节点和多个 Slave 节点。Master 节点负责管理消息的写入和消费者的注册,而 Slave 节点则负责数据的备份和读取。当 Master 节点宕机或网络故障时,需要进行 Master 选举,以保证系统的高可用性。

RocketMQ 的 Master 选举过程如下:

  1. Master 节点宕机或网络故障时,Broker 会检测到 Master 节点的状态变化,并触发 Master 选举过程。

  2. Broker 会将所有可用的 Slave 节点按照一定的规则(例如 ID 的大小)排序,并选取第一个 Slave 节点作为新的 Master 节点。

  3. 新的 Master 节点会向所有 Slave 节点发送一条“同步请求”,要求它们将未同步的数据全部发送给新的 Master 节点。

  4. 当所有 Slave 节点的数据同步完成后,新的 Master 节点会向 NameServer 发送一条“注册 Broker”请求,以更新 Broker 的状态信息。

  5. NameServer 收到新的 Master 节点的注册请求后,会将其状态更新为 Master,并通知所有消费者和生产者该 Broker 的状态变化。

  6. 如果新的 Master 节点在一定时间内没有发送“心跳”信息或与 NameServer 失去连接,NameServer 会将其状态更新为 DOWN,并触发 Master 选举过程。

RocketMQ 的 Master 选举过程是一个自动化的过程,能够在 Master 节点故障时快速地选举出新的 Master 节点,并保证消息系统的高可用性。

# 13.消费者的 Rebalance

RocketMQ 的消费者 Rebalance 过程如下:

  1. 首先,消费者启动时会从 NameServer 获取所有的 Broker 信息,包括 Broker 的名称、地址、角色(Master 或 Slave)等。

  2. 然后,消费者会根据订阅的 TopicTagsNameServer 获取所有可用的消息队列,包括每个消息队列所在的 Broker、队列 ID 等信息。

  3. 接下来,消费者会根据消费者组名和消费者 ID 生成一个唯一的消费者标识符,用于在 Broker 端标识该消费者。

  4. 消费者将所有可用的消息队列按照一定的规则(默认为平均分配)分配给各个消费者线程,以实现消息的负载均衡。分配规则可以通过实现 Rebalance 算法接口来自定义。

  5. 当消费者线程启动时,它会向 Broker 发送一条“注册消费者”请求,包括消费者标识符、订阅的 Topic 和 Tags、消费模式(集群消费或广播消费)等信息。

  6. Broker 接收到消费者的注册请求后,会将消费者信息保存在内存中,并返回该消费者需要消费的消息队列列表。

  7. 如果消费者发现自己被分配到的消息队列发生了变化(例如新增或减少了消息队列),它会再次触发 Rebalance 过程,重新分配消息队列。这个过程会在消费者定时任务中定期执行。

  8. 当消费者从 Broker 拉取到消息后,会将消息提交到本地缓存中,并向 Broker 发送一条“消息消费完成”请求。Broker 收到该请求后会将消息标记为已消费,以保证消息不会被重复消费。

  9. 如果消费者在消费消息的过程中发生了异常(例如网络故障、消息处理失败等),它会向 Broker 发送一条“消息消费失败”请求,以告知 Broker 该消息需要重新消费。

RocketMQ 的消费者 Rebalance 过程是一个动态的过程,会根据消费者的启动、关闭、新增、删除等事件来动态调整消息队列的分配,以实现消费者的负载均衡和高可用性。

RocketMQ中有6个队列分给了3个消费者,新加了一个消费者,如何rebalance:

在 RocketMQ 中,当新增一个消费者时,可以通过重平衡(rebalance)来重新分配队列。重平衡是指重新计算每个消费者应该消费哪些队列,以达到负载均衡的目的。

具体步骤如下:

  1. 停止所有的消费者。

  2. 修改消费者组的消费者数量,将新的消费者数量提交到 NameServer

  3. 启动所有的消费者。

  4. 等待一段时间,让 NameServer 通知所有的消费者进行重平衡。

在重平衡过程中,NameServer 会计算每个消费者应该消费哪些队列,并将分配结果发送给每个消费者。消费者会根据这个结果重新分配队列并开始消费消息。重平衡过程可能需要一些时间,具体时间取决于消费者组的大小和网络延迟等因素。需要注意的是,重平衡可能会导致消费者的消息消费进度发生变化,因此在进行重平衡之前需要做好相关的数据备份和恢复工作

# 14.动态更新消费者?

在 RocketMQ 中,更新消费者(或者称之为 Rebalance)通常是一个需要小心处理的操作,以确保消息消费的稳定性和数据一致性。在一些情况下,确实需要停机来进行 Rebalance,但也有一些方法可以实现动态的消费者更新而无需停机。以下是一些常见的方法:

  1. 消费者组名变更:如果你希望新增消费者,可以创建一个新的消费者组,并将新的消费者加入该组。然后,逐步启动新消费者,让其开始消费消息。在这个过程中,旧的消费者组继续工作,确保消息不会丢失。待新消费者完全就绪后,你可以停止旧消费者组的消费者。这种方法不会导致消息的中断,但需要确保消息消费的顺序和幂等性。

  2. 动态调整消费者线程数量:RocketMQ 允许在不停机的情况下调整消费者线程数量。你可以通过修改消费者的配置来调整线程数量,然后重新启动消费者线程。这样可以在不停机的情况下实现消费者的动态更新。

  3. 增加消费者并重新平衡:如果你希望增加消费者并且希望消息能够均匀地分配给新消费者,可以通过执行 Rebalance 操作来实现。RocketMQ 提供了 Rebalance 的命令,可以手动触发 Rebalance 操作,以确保消息分配均匀。虽然这可能会导致短暂的消息处理停顿,但可以避免完全停机。

需要注意的是,不同的应用场景可能需要不同的策略。在进行消费者更新时,一定要谨慎操作,确保消息不会丢失,消费者不会重复消费,以及整个系统的稳定性。

# 四.集群相关

# 1.各角色介绍

  • Producer:消息的发送者;举例:发信者
  • Consumer:消息接收者;举例:收信者
  • Broker:暂存和传输消息;举例:邮局
  • NameServer:管理 Broker;举例:各个邮局的管理机构
  • Topic:区分消息的种类;一个发送者可以发送消息给一个或者多个 Topic;一个消息的接收者可以订阅一个或者多个 Topic 消息
  • Message Queue:相当于是 Topic 的分区;用于并行发送和接收消息

image-20230602161456487

# 2.集群特点

  • NameServer 是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。

  • Broker 部署相对复杂,Broker 分为 MasterSlave,一个 Master 可以对应多个 Slave,但是一个 Slave 只能对应一个 Master,Master 与 Slave 的对应关系通过指定相同的 BrokerName,不同的 BrokerId 来定义,BrokerId 为 0 表示 Master,非 0 表示 Slave。Master 也可以部署多个。每个 Broker 与 NameServer 集群中的所有节点建立长连接,定时注册 Topic 信息到所有 NameServer

  • ProducerNameServer 集群中的其中一个节点(随机选择)建立长连接,定期从 NameServer 取 Topic 路由信息,并向提供 Topic 服务的 Master 建立长连接,且定时向 Master 发送心跳。Producer 完全无状态,可集群部署。

  • ConsumerNameServer 集群中的其中一个节点(随机选择)建立长连接,定期从 NameServer 取 Topic 路由信息,并向提供 Topic 服务的 Master、Slave 建立长连接,且定时向 Master、Slave 发送心跳。Consumer 既可以从 Master 订阅消息,也可以从 Slave 订阅消息,订阅规则由 Broker 配置决定。

# 3.集群模式

# 1.单 Master

这种方式风险较大,一旦 Broker 重启或者宕机时,会导致整个服务不可用。不建议线上环境使用,可以用于本地测试。

# 2.多 Master

一个集群无 Slave,全是 Master,例如 2 个 Master 或者 3 个 Master,这种模式的优缺点如下:

  • 优点:配置简单,单个 Master 宕机或重启维护对应用无影响,在磁盘配置为 RAID10 时,即使机器宕机不可恢复情况下,由于 RAID10 磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢),性能最高;
  • 缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到影响。

# 3.多 Master 多 Slave

每个 Master 配置一个 Slave,有多对 Master-Slave,HA 采用异步复制方式,主备有短暂消息延迟(毫秒级),这种模式的优缺点如下:

  • 优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,同时 Master 宕机后,消费者仍然可以从 Slave 消费,而且此过程对应用透明,不需要人工干预,性能同多 Master 模式几乎一样;
  • 缺点:Master 宕机,磁盘损坏情况下会丢失少量消息。

# 4.多 Master 多 Slave 双写模式

每个 Master 配置一个 Slave,有多对 Master-Slave,HA 采用同步双写方式,即只有主备都写成功,才向应用返回成功,这种模式的优缺点如下:

  • 优点:数据与服务都无单点故障,Master 宕机情况下,消息无延迟,服务可用性与数据可用性都非常高;
  • 缺点:性能比异步复制模式略低(大约低 10%左右),发送单个消息的 RT 会略高,且目前版本在主节点宕机后,备机不能自动切换为主机。

# 4.集群工作流程

  1. 启动 NameServerNameServer 起来后监听端口,等待 BrokerProducerConsumer 连上来,相当于一个路由控制中心。
  2. Broker 启动,跟所有的 NameServer 保持长连接,定时发送心跳包。心跳包中包含当前 Broker 信息(IP+端口等)以及存储所有 Topic 信息。注册成功后,NameServer 集群中就有 TopicBroker 的映射关系。
  3. 收发消息前,先创建 Topic,创建 Topic 时需要指定该 Topic 要存储在哪些 Broker 上,也可以在发送消息时自动创建 Topic。
  4. Producer 发送消息,启动时先跟 NameServer 集群中的其中一台建立长连接,并从 NameServer 中获取当前发送的 Topic 存在哪些 Broker 上,轮询从队列列表中选择一个队列,然后与队列所在的 Broker 建立长连接从而向 Broker 发消息。
  5. Consumer 跟 Producer 类似,跟其中一台 NameServer 建立长连接,获取当前订阅 Topic 存在哪些 Broker 上,然后直接跟 Broker 建立连接通道,开始消费消息。

# 5.动态添加 NameServer

要动态添加 RocketMQNameServer,您需要进行以下步骤:

  1. 在您的 RocketMQ 的安装目录下,找到 conf 文件夹,里面有一个 broker.conf 文件。
  2. 打开 broker.conf 文件,找到 namesrvAddr 属性,它可能看起来像这样:namesrvAddr=192.168.0.1:9876;192.168.0.2:9876
  3. 在 namesrvAddr 属性中添加新的 nameserver 的 IP 地址和端口号,例如:namesrvAddr=192.168.0.1:9876;192.168.0.2:9876;192.168.0.3:9876
  4. 保存 broker.conf 文件并重启 RocketMQ Broker 服务。

在重启后,RocketMQ Broker 将会动态地向新的 NameServer 注册并开始使用它。请注意,如果您使用的是 RocketMQ 的集群模式,则需要在所有 Broker 节点上重复这些步骤,以确保所有节点都可以使用新的 NameServer

# 6.Topic 和 broker 的关系

Topic 是消息的逻辑概念,它代表了一类消息的集合,可以理解为一个主题或者一个频道。而 Broker 则是消息的实际存储和传递的节点,它负责接收来自生产者的消息,存储消息并将消息传递给消费者。

RocketMQ 中,每个 Topic 都需要被分配到一个或多个 Broker 上进行存储和传递。这些 Broker 被称为 Topic 的路由信息,它们负责处理该 Topic 下的所有消息。当一个生产者发送一条消息到一个 Topic 时,RocketMQ 会根据该 Topic 的路由信息将消息发送到相应的 Broker 上进行存储和传递。当消费者订阅该 Topic 时,RocketMQ 会从该 Topic 的所有 Broker 中获取消息并将其传递给消费者。

RocketMQ 中,Broker 和 Topic 之间的关系是动态的,可以通过调整 BrokerTopic 的路由信息来实现负载均衡和容错。例如,当一个 Broker 出现故障或负载过高时,可以通过重新分配 Topic 的路由信息来将消息转移到其他 Broker 上,以确保消息的可靠传递。

# 7.RocketMQ 高可用

RocketMQ 高可用有很多种方式,比如:单机部署,多主集群,双主双从同步部署,双主双从异步部署,以及多主多从部署。部署集群可按照自己公司的实际情况进行部署。

单机部署:只启动一个 rocketMQ 实例就可以了,一般常用来本机测试使用。原因:一旦 rocketMQ 因某些原因挂掉,导致 mq 无法使用,则我们服务器无法接受信息与消费信息等。

多主集群:只部署 mq 主节点,无部署从节点。优点:配置简单,单个 Master 宕机或重启维护对应用无影响,即使机器宕机不可恢复情况下,也有其他主节点进行写入操作,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢),性能最高;

双主双从同步:部署四个节点,每个主节点都有一个从节点,主与从节点的信息通过同步的方式进行保存。优点:消息不会丢失,即:主节点挂了后,从节点的消息也不会造成丢失,只不过没法接受新消息,只能消费,但是能保证我消费的消息一定是准确的。缺点:客户端接受服务器响应时间长。可用于消息安全高的场景。

双主双从异步:与双主双从一致,只不过在主从数据传输是通过异步的方式,优点:客户端能快速的接收到服务器的消息,缺点:主节点挂的情况,从节点会丢失一部分消息。可用于允许消息丢失,吞吐量高的情景。

# 五.高级功能

# 1.消息存储的规程

分布式队列因为有高可靠性的要求,所以数据要进行持久化存储。

  1. 消息生成者发送消息
  2. MQ 收到消息,将消息进行持久化,在存储中新增一条记录
  3. 返回 ACK 给生产者
  4. MQ push 消息给对应的消费者,然后等待消费者返回 ACK
  5. 如果消息消费者在指定时间内成功返回 ack,那么 MQ 认为消息消费成功,在存储中删除消息,即执行第 6 步;如果 MQ 在指定时间内没有收到 ACK,则认为消息消费失败,会尝试重新 push 消息,重复执行 4、5、6 步骤
  6. MQ 删除消息

# 2.文件系统存储

目前业界较为常用的几款产品(RocketMQ/Kafka/RabbitMQ)均采用的是消息刷盘至所部署虚拟机/物理机的文件系统来做持久化(刷盘一般可以分为异步刷盘和同步刷盘两种模式)。消息刷盘为消息存储提供了一种高效率、高可靠性和高性能的数据持久化方式。除非部署 MQ 机器本身或是本地磁盘挂了,否则一般是不会出现无法持久化的故障问题。

# 3.消息存储

磁盘如果使用得当,磁盘的速度完全可以匹配上网络 的数据传输速度。目前的高性能磁盘,顺序写速度可以达到 600MB/s, 超过了一般网卡的传输速度。但是磁盘随机写的速度只有大概 100KB/s,和顺序写的性能相差 6000 倍!因为有如此巨大的速度差别,好的消息队列系统会比普通的消息队列系统速度快多个数量级。RocketMQ 的消息用顺序写,保证了消息存储的速度。

# 4.消息发送

Linux 操作系统分为【用户态】和【内核态】,文件操作、网络操作需要涉及这两种形态的切换,免不了进行数据复制。

一台服务器 把本机磁盘文件的内容发送到客户端,一般分为两个步骤:

1)read;读取本地文件内容;

2)write;将读取的内容通过网络发送出去。

这两个看似简单的操作,实际进行了 4 次数据复制,分别是:

  1. 从磁盘复制数据到内核态内存;
  2. 从内核态内存复 制到用户态内存;
  3. 然后从用户态 内存复制到网络驱动的内核态内存;
  4. 最后是从网络驱动的内核态内存复制到网卡中进行传输。

通过使用 mmap 的方式,可以省去向用户态的内存复制,提高速度。这种机制在 Java 中是通过 MappedByteBuffer 实现的

RocketMQ 充分利用了上述特性,也就是所谓的“零拷贝”技术,提高消息存盘和网络发送的速度。

这里需要注意的是,采用 MappedByteBuffer 这种内存映射的方式有几个限制,其中之一是一次只能映射 1.5~2G 的文件至用户态的虚拟内存,这也是为何 RocketMQ 默认设置单个 CommitLog 日志数据文件为 1G 的原因了

# 5.消息存储结构

RocketMQ 消息的存储是由 ConsumeQueueCommitLog 配合完成 的,消息真正的物理存储文件是 CommitLogConsumeQueue 是消息的逻辑队列,类似数据库的索引文件,存储的是指向物理存储的地址。每 个 Topic 下的每个 Message Queue 都有一个对应的 ConsumeQueue 文件。

  • CommitLog:存储消息的元数据
  • ConsumerQueue:存储消息在 CommitLog 的索引
  • IndexFile:为了消息查询提供了一种通过 key 或时间区间来查询消息的方法,这种通过 IndexFile 来查找消息的方法不影响发送与消费消息的主流程

RocketMQCommitLog 是一个非常重要的组件,用于存储所有的消息数据。当生产者发送消息时,消息会首先被写入 CommitLog 中,然后再被异步刷写到磁盘中。当消费者消费消息时,CommitLog 中的消息会被读取并发送给消费者。

CommitLog 中的消息是按照时间顺序存储的,每个消息都有一个唯一的物理偏移量。RocketMQ 使用物理偏移量来定位消息,因此可以快速地读取和发送消息。

CommitLog 的存储结构非常简单,由多个固定大小的文件组成,每个文件的大小可以通过 Broker 的配置进行调整。当一个文件写满后,RocketMQ 会自动创建一个新的文件,然后继续将消息写入新文件中。

CommitLog 的写入性能非常高,因为 RocketMQ 使用了零拷贝技术和内存映射文件来提高写入效率。同时,CommitLog 的读取性能也非常高,因为 RocketMQ 使用了缓存和预读技术来提高读取效率。

# 6.如何快速查询到需要的消息

在 RocketMQ 中,多个 CommitLog 是按照时间顺序存储的,每个 CommitLog 中的消息都有一个唯一的物理偏移量。当需要查询某个消息时,RocketMQ 可以通过以下步骤快速定位到该消息:

  1. 首先,RocketMQ 会根据消息的时间戳计算出该消息在哪个 CommitLog 中,因为多个 CommitLog 是按照时间顺序存储的。

  2. 然后,RocketMQ 会根据消息的物理偏移量在对应的 CommitLog 中查找该消息。

  3. 如果该 CommitLog 中没有该消息,RocketMQ 会继续在下一个 CommitLog 中查找,直到找到该消息为止。

RocketMQ 还提供了索引文件来加速消息的查找。索引文件 ConsumerQueue 中存储了每个 CommitLog 中的消息的索引信息,包括消息的偏移量、时间戳和消息所在的文件名。当需要查询某个消息时,RocketMQ 可以先在索引文件 ConsumerQueue 中查找该消息所在的 CommitLog 和偏移量,然后再在对应的 CommitLog 中查找该消息。

# 7.刷盘机制

RocketMQ 的消息是存储到磁盘上的,这样既能保证断电后恢复, 又可以让存储的消息量超出内存的限制。RocketMQ 为了提高性能,会尽可能地保证磁盘的顺序写。消息在通过 Producer 写入 RocketMQ 的时候,有两种写磁盘方式,分布式同步刷盘和异步刷盘。

同步刷盘:

在返回写成功状态时,消息已经被写入磁盘。具体流程是,消息写入内存的 PAGECACHE 后,立刻通知刷盘线程刷盘, 然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写成功的状态。

异步刷盘:

在返回写成功状态时,消息可能只是被写入了内存的 PAGECACHE,写操作的返回快,吞吐量大;当内存里的消息量积累到一定程度时,统一触发写磁盘动作,快速写入。

配置:

同步刷盘还是异步刷盘,都是通过 Broker 配置文件里的 flushDiskType 参数设置的,这个参数被配置成 SYNC_FLUSHASYNC_FLUSH 中的 一个。

# 8.高可用性机制

RocketMQ 分布式集群是通过 MasterSlave 的配合达到高可用性的。

Master 和 Slave 的区别:在 Broker 的配置文件中,参数 brokerId 的值为 0 表明这个 Broker 是 Master,大于 0 表明这个 Broker 是 Slave,同时 brokerRole 参数也会说明这个 Broker 是 Master 还是 Slave。

Master 角色的 Broker 支持读和写,Slave 角色的 Broker 仅支持读,也就是 Producer 只能和 Master 角色的 Broker 连接写入消息;Consumer 可以连接 Master 角色的 Broker,也可以连接 Slave 角色的 Broker 来读取消息。

消息消费高可用:

在 Consumer 的配置文件中,并不需要设置是从 Master 读还是从 Slave 读,当 Master 不可用或者繁忙的时候,Consumer 会被自动切换到从 Slave 读。有了自动切换 Consumer 这种机制,当一个 Master 角色的机器出现故障后,Consumer 仍然可以从 Slave 读取消息,不影响 Consumer 程序。这就达到了消费端的高可用性。

消息发送高可用:

在创建 Topic 的时候,把 Topic 的多个 Message Queue 创建在多个 Broker 组上(相同 Broker 名称,不同 brokerId 的机器组成一个 Broker 组),这样当一个 Broker 组的 Master 不可用后,其他组的 Master 仍然可用,Producer 仍然可以发送消息。RocketMQ 目前还不支持把 Slave 自动转成 Master,如果机器资源不足, 需要把 Slave 转成 Master,则要手动停止 Slave 角色的 Broker,更改配置文 件,用新的配置文件启动 Broker。

# 9.消息主从复制

如果一个 Broker 组有 Master 和 Slave,消息需要从 Master 复制到 Slave 上,有同步和异步两种复制方式。

同步复制:

同步复制方式是等 Master 和 Slave 均写成功后才反馈给客户端写成功状态;

在同步复制方式下,如果 Master 出故障, Slave 上有全部的备份数据,容易恢复,但是同步复制会增大数据写入 延迟,降低系统吞吐量。

异步复制:

异步复制方式是只要 Master 写成功 即可反馈给客户端写成功状态。

在异步复制方式下,系统拥有较低的延迟和较高的吞吐量,但是如果 Master 出了故障,有些数据因为没有被写 入 Slave,有可能会丢失;

配置:

同步复制和异步复制是通过 Broker 配置文件里的 brokerRole 参数进行设置的,这个参数可以被设置成 ASYNC_MASTERSYNC_MASTERSLAVE 三个值中的一个。主节点有 2 个值可以选择,从节点只能选择 SLAVE

实际应用中要结合业务场景,合理设置刷盘方式和主从复制方式, 尤其是 SYNC_FLUSH 方式,由于频繁地触发磁盘写动作,会明显降低性能。通常情况下,应该把 broker Master 和 Save 配置成 ASYNC_FLUSH 的异步刷盘方式,消息主从之间配置成 SYNC_MASTER 的复制方式,这样即使有一台 机器出故障,仍然能保证数据不丢,是个不错的选择。

# 10.Producer 负载均衡

Producer 端,每个实例在发消息的时候,默认会轮询所有的 message queue 发送,以达到让消息平均落在不同的 queue 上。而由于 queue 可以散落在不同的 broker,所以消息就发送到不同的 broker 下

在 Apache RocketMQ 中,Producer 的负载均衡是指消息生产者在向不同的消息队列发送消息时,如何均匀地分布负载,以提高系统的性能和可伸缩性。RocketMQ 提供了一种负载均衡策略,即选择消息队列的算法,以确保消息发送者将消息发送到不同的队列上,从而实现负载均衡。这对于分布式消息系统来说尤为重要,可以避免某些队列负载过重,而其他队列却处于低负载状态的情况。

RocketMQ 的负载均衡是通过选择消息队列的算法来实现的。默认情况下,RocketMQ 提供了以下两种负载均衡策略:

  1. 轮询(Round Robin):每个消息生产者依次将消息发送到不同的队列,按照队列的顺序进行轮流分配。

  2. 随机(Random):每个消息生产者随机选择一个队列发送消息,以实现均衡分配。

在使用 RocketMQ 的 Producer 时,可以根据需要选择适合的负载均衡策略。负载均衡策略可以在创建 Producer 实例时进行配置。以下是一个示例代码,展示了如何在 RocketMQ 中配置 Producer 的负载均衡策略:

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;

public class ProducerExample {
    public static void main(String[] args) throws Exception {
        // 创建一个消息生产者实例
        DefaultMQProducer producer = new DefaultMQProducer("example_group");

        // 设置NameServer的地址
        producer.setNamesrvAddr("localhost:9876");

        // 设置负载均衡策略为轮询
        producer.setSendMsgsAcquireOrderly(true); // 设置为 true 表示启用负载均衡策略

        // 启动消息生产者
        producer.start();

        // 创建消息实例
        Message message = new Message("example_topic", "Hello RocketMQ".getBytes());

        // 发送消息
        SendResult result = producer.send(message);
        if (result.getSendStatus() == SendStatus.SEND_OK) {
            System.out.println("消息发送成功");
        }

        // 关闭消息生产者
        producer.shutdown();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

上述示例中,通过调用 setSendMsgsAcquireOrderly(true) 方法将负载均衡策略设置为轮询(Round Robin)。您还可以根据需要选择其他策略,比如随机分配。

需要注意的是,RocketMQ 的负载均衡策略仅适用于同一消息生产者实例发送到同一主题的情况。如果您使用多个消息生产者实例,或者涉及到不同主题,负载均衡将由 RocketMQ 服务器根据配置进行处理。

# 11.Consumer 负载均衡

集群模式:

在集群消费模式下,每条消息只需要投递到订阅这个 topic 的 Consumer Group 下的一个实例即可。RocketMQ 采用主动拉取的方式拉取并消费消息,在拉取的时候需要明确指定拉取哪一条 message queue。

而每当实例的数量有变更,都会触发一次所有实例的负载均衡,这时候会按照 queue 的数量和实例的数量平均分配 queue 给每个实例。当有新的 Consumer 加入或者有 Consumer 下线时,RocketMQ 会重新计算每个 Consumer 需要消费的 Message Queue,并将它们分配给对应的 Consumer。

RoundRobin负载均衡策略:

默认的分配算法是 AllocateMessageQueueAveragely

还有另外一种平均的算法是 AllocateMessageQueueAveragelyByCircle,也是平均分摊每一条 queue,只是以环状轮流分 queue 的形式.

需要注意的是,集群模式下,queue 都是只允许分配给一个实例,这是由于如果多个实例同时消费一个 queue 的消息,由于拉取哪些消息是 consumer 主动控制的,那样会导致同一个消息在不同的实例下被消费多次,所以算法上都是一个 queue 只分给一个 consumer 实例,一个 consumer 实例可以允许同时分到不同的 queue。

通过增加 consumer 实例去分摊 queue 的消费,可以起到水平扩展的消费能力的作用。而有实例下线的时候,会重新触发负载均衡,这时候原来分配到的 queue 将分配到其他实例上继续消费。

但是如果 consumer 实例的数量比 message queue 的总数量还多的话(1 比 1 还多),多出来的 consumer 实例将无法分到 queue,也就无法消费到消息,也就无法起到分摊负载的作用了。所以需要控制让 queue 的总数量大于等于 consumer 的数量。

ConsistentHash负载均衡策略:

ConsistentHash 负载均衡策略会根据消息的 Key 值将消息分配给对应的 Message Queue,然后再将每个 Message Queue 分配给对应的 Consumer。这种负载均衡策略可以保证相同 Key 值的消息被分配到同一个 Message Queue 中,从而保证消息的有序性。当有新的 Consumer 加入或者有 Consumer 下线时,RocketMQ 会重新计算每个 Message Queue 应该分配给哪个 Consumer。

广播模式:

由于广播模式下要求一条消息需要投递到一个消费组下面所有的消费者实例,所以也就没有消息被分摊消费的说法。

在实现上,其中一个不同就是在 consumer 分配 queue 的时候,所有 consumer 都分到所有的 queue。

实现原理:

在 RocketMQ 中,Consumer Group 是一组消费者的集合,它们共同消费同一个 Topic 下的消息。Consumer Group 的作用是实现消息的负载均衡和高可用性。具体来说,一个 Consumer Group 中的每个消费者都会订阅 Topic 下的所有消息,但是每个消息只会被 Consumer Group 中的一个消费者消费,这样就可以实现消息的负载均衡。同时,如果 Consumer Group 中的某个消费者出现故障或者下线,其他消费者可以接替它消费该消费者未消费的消息,从而实现高可用性。

需要注意的是,一个 Topic 下可以有多个 Consumer Group,每个 Consumer Group 都有自己的消费者集合,它们之间相互独立,不会相互影响。如果多个 Consumer Group 订阅同一个 Topic,那么该 Topic 下的每个消息都会被所有 Consumer Group 中的一个消费者消费一次。这个特性可以用于实现消息的广播。

# 12.消息的重试

顺序消息的重试:

对于顺序消息,当消费者消费消息失败后,消息队列 RocketMQ 会自动不断进行消息重试(每次间隔时间为 1 秒),这时,应用会出现消息消费被阻塞的情况。因此,在使用顺序消息时,务必保证应用能够及时监控并处理消费失败的情况,避免阻塞现象的发生。

无序消息的重试:

对于无序消息(普通、定时、延时、事务消息),当消费者消费消息失败时,您可以通过设置返回状态达到消息重试的结果。

无序消息的重试只针对集群消费方式生效;广播方式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息。

重试次数:

消息队列 RocketMQ 默认允许每条消息最多重试 16 次,每次重试的间隔时间如下:

第几次重试 与上次重试的间隔时间 第几次重试 与上次重试的间隔时间
1 10 秒 9 7 分钟
2 30 秒 10 8 分钟
3 1 分钟 11 9 分钟
4 2 分钟 12 10 分钟
5 3 分钟 13 20 分钟
6 4 分钟 14 30 分钟
7 5 分钟 15 1 小时
8 6 分钟 16 2 小时

如果消息重试 16 次后仍然失败,消息将不再投递。如果严格按照上述重试时间间隔计算,某条消息在一直消费失败的前提下,将会在接下来的 4 小时 46 分钟之内进行 16 次重试,超过这个时间范围消息将不再重试投递。

注意: 一条消息无论重试多少次,这些重试消息的 Message ID 不会改变。

配置方式:

消费失败后,重试配置方式

集群消费方式下,消息消费失败后期望消息重试,需要在消息监听器接口的实现中明确进行配置(三种方式任选一种):

  • 返回 Action.ReconsumeLater (推荐)
  • 返回 Null
  • 抛出异常
public class MessageListenerImpl implements MessageListener {
    @Override
    public Action consume(Message message, ConsumeContext context) {
        //处理消息
        doConsumeMessage(message);
        //方式1:返回 Action.ReconsumeLater,消息将重试
        return Action.ReconsumeLater;
        //方式2:返回 null,消息将重试
        return null;
        //方式3:直接抛出异常, 消息将重试
        throw new RuntimeException("Consumer Message exceotion");
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13

Action.ReconsumeLater 是 RocketMQ 中的一种消息消费结果,表示消费者无法处理当前的消息,需要稍后再次消费。具体来说,当消费者在消费消息时发生异常或者返回 RECONSUME_LATER 结果时,RocketMQ 会将该消息重新投递到 Broker,然后根据消息的重试次数和重试时间间隔进行重试,直到达到最大重试次数或者消息被消费成功为止。这个机制可以保证消息不会因为消费者出现问题而丢失,同时也可以保证消息的可靠性。

消费失败后,不重试配置方式

集群消费方式下,消息失败后期望消息不重试,需要捕获消费逻辑中可能抛出的异常,最终返回 Action.CommitMessage,此后这条消息将不会再重试。

public class MessageListenerImpl implements MessageListener {
    @Override
    public Action consume(Message message, ConsumeContext context) {
        try {
            doConsumeMessage(message);
        } catch (Throwable e) {
            //捕获消费逻辑中的所有异常,并返回 Action.CommitMessage;
            return Action.CommitMessage;
        }
        //消息处理正常,直接返回 Action.CommitMessage;
        return Action.CommitMessage;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13

自定义消息最大重试次数

消息队列 RocketMQ 允许 Consumer 启动的时候设置最大重试次数,重试时间间隔将按照如下策略:

  • 最大重试次数小于等于 16 次,则重试时间间隔同上表描述。
  • 最大重试次数大于 16 次,超过 16 次的重试时间间隔均为每次 2 小时。

消息队列 RocketMQ 默认允许每条消息最多重试 16 次,每次重试的间隔时间如下:

第几次重试 与上次重试的间隔时间 第几次重试 与上次重试的间隔时间
1 10 秒 9 7 分钟
2 30 秒 10 8 分钟
3 1 分钟 11 9 分钟
4 2 分钟 12 10 分钟
5 3 分钟 13 20 分钟
6 4 分钟 14 30 分钟
7 5 分钟 15 1 小时
8 6 分钟 16 2 小时
Properties properties = new Properties();
//配置对应 Group ID 的最大消息重试次数为 20 次
properties.put(PropertyKeyConst.MaxReconsumeTimes,"20");
Consumer consumer =ONSFactory.createConsumer(properties);
1
2
3
4

注意:

  • 消息最大重试次数的设置对相同 Group ID 下的所有 Consumer 实例有效。
  • 如果只对相同 Group ID 下两个 Consumer 实例中的其中一个设置了 MaxReconsumeTimes,那么该配置对两个 Consumer 实例均生效。
  • 配置采用覆盖的方式生效,即最后启动的 Consumer 实例会覆盖之前的启动实例的配置

获取消息重试次数

消费者收到消息后,可按照如下方式获取消息的重试次数:

public class MessageListenerImpl implements MessageListener {
    @Override
    public Action consume(Message message, ConsumeContext context) {
        //获取消息的重试次数
        System.out.println(message.getReconsumeTimes());
        return Action.CommitMessage;
    }
}
1
2
3
4
5
6
7
8

# 13.死信队列

当一条消息初次消费失败,消息队列 RocketMQ 会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 RocketMQ 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。

在消息队列 RocketMQ 中,这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。

死信队列特征:

死信消息具有以下特性

  • 不会再被消费者正常消费。
  • 有效期与正常消息相同,均为 3 天,3 天后会被自动删除。因此,请在死信消息产生后的 3 天内及时处理。

死信队列具有以下特性:

  • 一个死信队列对应一个 Group ID, 而不是对应单个消费者实例。
  • 如果一个 Group ID 未产生死信消息,消息队列 RocketMQ 不会为其创建相应的死信队列。
  • 一个死信队列包含了对应 Group ID 产生的所有死信消息,不论该消息属于哪个 Topic。

查看死信信息:

在控制台查询出现死信队列的主题信息

在消息界面根据主题查询死信消息

选择重新发送消息:

一条消息进入死信队列,意味着某些因素导致消费者无法正常消费该消息,因此,通常需要您对其进行特殊处理。排查可疑因素并解决问题后,可以在消息队列 RocketMQ 控制台重新发送该消息,让消费者重新消费一次。

# 14.消费幂等

消息队列 RocketMQ 消费者在接收到消息以后,有必要根据业务上的唯一 Key 对消息做幂等处理的必要性。

消费幂等的必要性:

在互联网应用中,尤其在网络不稳定的情况下,消息队列 RocketMQ 的消息有可能会出现重复,这个重复简单可以概括为以下情况:

  • 发送时消息重复:当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者客户端宕机,导致服务端对客户端应答失败。 如果此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。

  • 投递时消息重复:消息消费的场景下,消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候网络闪断。 为了保证消息至少被消费一次,消息队列 RocketMQ 的服务端将在网络恢复后再次尝试投递之前已被处理过的消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。

  • 负载均衡时消息重复:(包括但不限于网络抖动、Broker 重启以及订阅方应用重启)当消息队列 RocketMQ 的 Broker 或客户端重启、扩容或缩容时,会触发 Rebalance,此时消费者可能会收到重复消息。

处理方式:

因为 Message ID 有可能出现冲突(重复)的情况,所以真正安全的幂等处理,不建议以 Message ID 作为处理依据。 最好的方式是以业务唯一标识作为幂等处理的关键依据,而业务的唯一标识可以通过消息 Key 进行设置:

Message message = new Message();
message.setKey("ORDERID_100");
SendResult sendResult = producer.send(message);
1
2
3

订阅方收到消息时可以根据消息的 Key 进行幂等处理:

consumer.subscribe("ons_test", "*", new MessageListener() {
    public Action consume(Message message, ConsumeContext context) {
        String key = message.getKey()
        //根据业务唯一标识的 key 做幂等处理,判断这个key之间是否处理过
    }
});
1
2
3
4
5
6

# 六.RocketMQ 零拷贝

# 1.PageCache 原理

PageCache

  • 由内存中的物理 page 组成,其内容对应磁盘上的 block。
  • PageCache 的大小是动态变化的。
  • backing store: cache 缓存的存储设备
  • 一个 page 通常包含多个 block, 而 block 不一定是连续的

读 Cache:当内核发起一个读请求时, 先会检查请求的数据是否缓存到了 page cache 中。如果有,那么直接从内存中读取,不需要访问磁盘, 此即 cache hit(缓存命中)。如果没有, 就必须从磁盘中读取数据, 然后内核将读取的数据再缓存到 cache 中, 如此后续的读请求就可以命中缓存了。page 可以只缓存一个文件的部分内容, 而不需要把整个文件都缓存进来。

写 Cache:当内核发起一个写请求时, 也是直接往 cache 中写入, 后备存储中的内容不会直接更新。内核会将被写入的 page 标记为 dirty, 并将其加入到 dirty list 中。内核会周期性地将 dirty list 中的 page 写回到磁盘上, 从而使磁盘上的数据和内存中缓存的数据一致。

cache 回收:Page cache 的另一个重要工作是释放 page, 从而释放内存空间。cache 回收的任务是选择合适的 page 释放。如果 page 是 dirty 的, 需要将 page 写回到磁盘中再释放。

# 2.cache 和 buffer 的区别

Cache:缓存区,是高速缓存,是位于 CPU 和主内存之间的容量较小但速度很快的存储器,因为 CPU 的速度远远高于主内存的速度,CPU 从内存中读取数据需等待很长的时间,而 Cache 保存着 CPU 刚用过的数据或循环使用的部分数据,这时从 Cache 中读取数据会更快,减少了 CPU 等待的时间,提高了系统的性能。

Cache 并不是缓存文件的,而是缓存块的(块是 I/O 读写最小的单元);Cache 一般会用在 I/O 请求上,如果多个进程要访问某个文件,可以把此文件读入 Cache 中,这样下一个进程获取 CPU 控制权并访问此文件直接从 Cache 读取,提高系统性能。

Buffer:缓冲区,用于存储速度不同步的设备或优先级不同的设备之间传输数据;通过 buffer 可以减少进程间通信需要等待的时间,当存储速度快的设备与存储速度慢的设备进行通信时,存储慢的数据先把数据存放到 buffer,达到一定程度存储快的设备再读取 buffer 的数据,在此期间存储快的设备 CPU 可以干其他的事情。

Buffer 一般是用在写入磁盘的,例如:某个进程要求多个字段被读入,当所有要求的字段被读入之前已经读入的字段会先放到 buffer 中。

# 3.HeapByteBuffer 和 DirectByteBuffer

HeapByteBuffer,是在 jvm 堆上面一个 buffer,底层的本质是一个数组,用类封装维护了很多的索引(limit/position/capacity 等)。

DirectByteBuffer,底层的数据是维护在操作系统的内存中,而不是 jvm 里,DirectByteBuffer 里维护了一个引用 address 指向数据,进而操作数据。

HeapByteBuffer 优点:内容维护在 jvm 里,把内容写进 buffer 里速度快;更容易回收

DirectByteBuffer 优点:跟外设(IO 设备)打交道时会快很多,因为外设读取 jvm 堆里的数据时,不是直接读取的,而是把 jvm 里的数据读到一个内存块里,再在这个块里读取的,如果使用 DirectByteBuffer,则可以省去这一步,实现 zero copy(零拷贝)

# 4.堆外内存实现零拷贝

  1. 前者分配在 JVM 堆上ByteBuffer.allocate(),后者分配在操作系统物理内存ByteBuffer.allocateDirect(),JVM 使用 C 库中的 malloc()方法分配堆外内存);
  2. DirectByteBuffer 可以减少 JVM GC 压力,当然,堆中依然保存对象引用,fullgc 发生时也会回收直接内存,也可以通过 system.gc 主动通知 JVM 回收,或者通过 cleaner.clean 主动清理。Cleaner.create()方法需要传入一个 DirectByteBuffer 对象和一个 Deallocator(一个堆外内存回收线程)。GC 发生时发现堆中的 DirectByteBuffer 对象没有强引用了,则调用 Deallocator 的 run()方法回收直接内存,并释放堆中 DirectByteBuffer 的对象引用;
  3. 底层 I/O 操作需要连续的内存(JVM 堆内存容易发生 GC 和对象移动),所以在执行 write 操作时需要将 HeapByteBuffer 数据拷贝到一个临时的(操作系统用户态)内存空间中,会多一次额外拷贝。而 DirectByteBuffer 则可以省去这个拷贝动作,这是 Java 层面的 “零拷贝” 技术,在 netty 中广泛使用;
  4. MappedByteBuffer 底层使用了操作系统的 mmap 机制,FileChannel#map()方法就会返回 MappedByteBufferDirectByteBuffer 虽然实现了 MappedByteBuffer,不过 DirectByteBuffer 默认并没有直接使用 mmap 机制。

# 5.内存映射文件(Mmap)

在 LINUX 中我们可以使用 mmap 用来在进程虚拟内存地址空间中分配地址空间,创建和物理内存的映射关系。

image-20230813205810133

映射关系可以分为两种

  • 文件映射 磁盘文件映射进程的虚拟地址空间,使用文件内容初始化物理内存。

  • 匿名映射 初始化全为 0 的内存空间。

而对于映射关系是否共享又分为

  • 私有映射(MAP_PRIVATE) 多进程间数据共享,修改不反应到磁盘实际文件,是一个 copy-on-write(写时复制)的映射方式。
  • 共享映射(MAP_SHARED) 多进程间数据共享,修改反应到磁盘实际文件中。

因此总结起来有 4 种组合

  • 私有文件映射 多个进程使用同样的物理内存页进行初始化,但是各个进程对内存文件的修改不会共享,也不会反应到物理文件中

  • 私有匿名映射 mmap 会创建一个新的映射,各个进程不共享,这种使用主要用于分配内存(malloc 分配大内存会调用 mmap)。 例如开辟新进程时,会为每个进程分配虚拟的地址空间,这些虚拟地址映射的物理内存空间各个进程间读的时候共享,写的时候会 copy-on-write。

  • 共享文件映射 多个进程通过虚拟内存技术共享同样的物理内存空间,对内存文件 的修改会反应到实际物理文件中,他也是进程间通信(IPC)的一种机制。

  • 共享匿名映射 这种机制在进行 fork 的时候不会采用写时复制,父子进程完全共享同样的物理内存页,这也就实现了父子进程通信(IPC).

mmap 只是在虚拟内存分配了地址空间,只有在第一次访问虚拟内存的时候才分配物理内存。

在 mmap 之后,并没有在将文件内容加载到物理页上,只上在虚拟内存中分配了地址空间。当进程在访问这段地址时,通过查找页表,发现虚拟内存对应的页没有在物理内存中缓存,则产生"缺页",由内核的缺页异常处理程序处理,将文件对应内容,以页为单位(4096)加载到物理内存,注意是只加载缺页,但也会受操作系统一些调度策略影响,加载的比所需的多。

# 6.直接内存读写过程

直接内存读取并发送文件的过程

image-20230813210040880

# 7.Mmap 读写过程

Mmap 读取并发送文件的过程:

  • 省去了从内核空间到用户空间的拷贝.
  • 应用程序映射到了内核缓冲区,读取的是映射的地址,不需要数据拷贝

"Mmap"(Memory-mapped files)是一种在内存和文件之间建立映射关系的机制,它允许应用程序通过内存访问文件的内容,而无需显式的读取或写入操作。在这个过程中,文件的内容被映射到了进程的地址空间,从而使得文件的读取和写入操作变得更加高效。以下是使用 Mmap 读取并发送文件的一般过程:

  1. 打开文件: 首先,应用程序需要打开要操作的文件。这可以通过标准的文件 I/O 函数(如 open)来实现。

  2. 获取文件大小: 在使用 Mmap 之前,需要获取文件的大小,以便映射适当的内存空间。这可以通过文件的元数据(如 stat)获得。

  3. 创建 Mmap 映射: 使用 mmap 函数创建一个映射区域,将文件内容映射到进程的地址空间中。mmap 函数接受参数包括文件描述符、映射的长度、访问权限等。

  4. 读取文件内容: 一旦映射建立,应用程序可以通过访问内存中的映射区域来读取文件内容,就像访问常规内存一样。这样的读取操作可以通过指针操作来实现。

  5. 发送文件内容: 读取到文件内容后,应用程序可以将数据发送到目标地点,比如网络套接字、消息队列等。

  6. 解除映射并关闭文件: 读取和发送操作完成后,需要使用 munmap 函数解除内存映射,然后关闭文件描述符。

image-20230813210056683

# 8.Sendfile 读写过程

Sendfile 零拷贝读取并发送文件的过程

  • 省去了映射的过程,用户空间交互更少.
  • 内核缓冲区映射到 Socket 缓冲区

sendfile 是一种在 Linux 系统上实现零拷贝(Zero-Copy)的机制,它允许将一个文件的数据直接从文件系统传输到网络套接字,而无需将数据从文件复制到用户空间再复制到内核空间。这可以显著提高数据传输的效率。以下是使用 sendfile 实现零拷贝读取并发送文件的一般过程:

  1. 打开文件和套接字: 首先,应用程序需要打开要操作的文件和网络套接字。

  2. 读取和发送文件内容: 使用 sendfile 函数将文件的内容直接从文件描述符传输到套接字。sendfile 函数接受源文件描述符、目标套接字、发送的字节数等参数。这样,文件内容会被直接复制到套接字的发送缓冲区,绕过了用户空间。

  3. 循环传输: 如果文件很大,可以通过循环调用 sendfile 来重复执行直到整个文件的内容都被发送。

  4. 关闭文件和套接字: 读取和发送操作完成后,需要关闭文件描述符和网络套接字。

使用 sendfile 可以在一定程度上减少数据传输过程中的数据复制操作,从而提高性能和效率。

image-20230813210113288

# 9.零拷贝小结

  • 虽然叫零拷贝,实际上 sendfile 有 2 次数据拷贝的。第 1 次是从磁盘拷贝到内核缓冲区,第二次是从内核缓冲区拷贝到网卡(协议引擎)。如果网卡支持 SG-DMA(The Scatter-Gather Direct Memory Access)技术,就无需 PageCache 拷贝至 Socket 缓冲区;
  • 之所以叫零拷贝,是从内存角度来看的,数据在内存中没有发生过拷贝,只是在内存I/O设备之间传输。很多时候我们认为 sendfile 才是零拷贝,mmap 严格来说不算;
  • Linux 中的 API 为 sendfilemmap,Java 中的 API 为 FileChanel.transferTo()、FileChannel.map()等;
  • Netty、Kafka(sendfile,mmap)、Rocketmq(mmap,sendfile)、Nginx 等高性能中间件中,都有大量利用操作系统零拷贝特性。

# 10.RocketMQ 中的零拷贝

在 Apache RocketMQ 中,使用零拷贝(Zero-Copy)技术可以提高消息传输的效率,减少数据在用户空间和内核空间之间的复制操作,从而提升性能。RocketMQ 在一些场景中使用了零拷贝技术,主要体现在消息的存储和传输过程中。以下是 RocketMQ 中的零拷贝相关要点总结:

  1. 消息存储零拷贝: RocketMQ 使用了零拷贝技术来存储消息在 Broker 服务器上。消息以一种内部数据结构存储,该数据结构能够避免将消息内容从用户空间复制到内核空间,从而提高存储的效率。

  2. 文件刷盘零拷贝: RocketMQ 通过使用零拷贝技术来实现消息的文件刷盘(Flush),这在消息持久化和可靠性方面非常重要。通过将消息的字节数据从内核空间直接写入磁盘,避免了复制操作,提高了刷盘的性能。

  3. sendfile 零拷贝传输: RocketMQ 在消息的传输过程中,特别是消息拉取的过程中,采用了 sendfile 系统调用实现零拷贝传输。这允许将消息从文件系统直接发送到网络套接字,减少了数据在用户空间和内核空间之间的拷贝。

  4. 数据序列化和反序列化: 尽管 RocketMQ 使用了零拷贝来减少数据在内核和用户空间之间的拷贝,但仍需要对数据进行序列化和反序列化操作,以便将消息从字节表示转换为消息对象或从消息对象转换为字节表示。

RocketMQ 在存储和传输消息的过程中充分利用了零拷贝技术,通过减少数据的复制操作,提高了性能和效率。这对于一个高吞吐量、低延迟的分布式消息中间件来说,是非常重要的优化手段之一。

# 七.常见问题

# 1.RPC 通信

RocketMQ 集群的一部分通信如下:

  1. Broker 启动后需要完成一次将自己注册至 NameServer 的操作;随后每隔 30s 时间定期向 NameServer 上报 Topic 路由信息;
  2. 消息生产者 Producer 作为客户端发送消息时候,需要根据 Msg 的 Topic 从本地缓存的 TopicPublishInfoTable 获取路由信息。如果没有则更新路由信息会从 NameServer 上重新拉取;
  3. 消息生产者 Producer 根据(2)中获取的路由信息选择一个队列(MessageQueue)进行消息发送;Broker 作为消息的接收者收消息并落盘存储; 从上面(1)~(3)中可以看出在消息生产者, Broker 和 NameServer 之间都会发生通信(这里只说了 MQ 的部分通信),因此如何设计一个良好的网络通信模块在 MQ 中至关重要,它将决定 RocketMQ 集群整体的消息传输能力与最终的性能。 rocketmq-remoting 模块是 RocketMQ 消息队列中负责网络通信的模块,它几乎被其他所有需要网络通信的模块(诸如 rocketmq-client、rocketmq-server、rocketmq-namesrv)所依赖和引用。为了实现客户端与服务器之间高效的数据请求与接收,RocketMQ 消息队列自定义了通信协议并在 Netty 的基础之上扩展了通信模块。ps:鉴于 RocketMQ 的通信模块是建立在 Netty 基础之上的

# 2.RabbitMQ 的 QPS 一般较低?

RabbitMQ 的 QPS(每秒请求数)较低可能受到以下一些因素的影响:

  1. 消息持久性: 默认情况下,RabbitMQ 将消息存储到磁盘以保证持久性。这可能导致相对较低的写入性能,因为磁盘写入通常比内存操作慢。

  2. 单线程模型: RabbitMQ 在内部使用单线程来处理消息的分发和处理,这可能会在高负载情况下导致性能瓶颈。尤其是在大量连接、队列和消费者的情况下,单线程可能成为限制因素。

  3. 消息确认机制: RabbitMQ 支持丰富的消息确认机制,包括生产者确认和消费者确认,以确保消息的可靠性传递。这些确认机制会增加一些额外的开销,对于需要强调可靠性的场景,可能会影响性能。

  4. 持久化策略: RabbitMQ 默认情况下采用了 fsync 持久化策略,即消息被写入磁盘后需要进行同步磁盘操作,这会影响写入性能。可以通过调整持久化策略来平衡性能和可靠性。

  5. 资源限制: 如果 RabbitMQ 所在的服务器资源有限,如 CPU、内存等,那么性能可能会受到限制。资源不足可能导致消息处理速度变慢。

  6. 消息体积和复杂性: 如果消息的体积较大或者消息的处理逻辑复杂,都可能导致 RabbitMQ 的处理速度下降。

需要指出的是,RabbitMQ 作为一个可靠的消息中间件,更注重消息的可靠性传递和持久性,而不是追求极高的吞吐量。如果对于高吞吐量是一个更重要的需求,可能需要考虑其他的消息中间件,比如 Apache Kafka 或者 Apache RocketMQ,它们在设计上更加注重高性能的消息传递。

# 3.slave 和 master

slave 挂了

  • slave 挂了对系统影响不大,因为 master 是处理写请求的
  • slave 挂了,系统只是只能从 master 获取消息了

master 挂了

  • 4.5 版本之前的是不能自动选举出来 master 的,只能通过运维手工启动,会有一段时间是不可用的
  • 4.5 版本之后,通过 Dledger 机制自动选举出来 master,选举时间十几秒,基于 Raft 协议

# 4.NS 为什么不需要节点同步

NameServer 为什么不需要节点同步

RocketMQ 中的 NameServer 不需要节点同步,原因与 DNS 类似。NameServer 主要负责消息生产者和消费者的注册和发现,它并不存储消息数据,而是存储了一些元数据,例如主题(Topic)和消费者组(Consumer Group)等信息。这些元数据通常不会频繁变动,而且可以通过广播的方式快速地同步到所有的 Broker 节点上,因此不需要进行节点同步。

当一个生产者或消费者启动时,它会向 NameServer 发送一个注册请求,NameServer 会将其注册信息存储在内存中,并将其广播给所有的 Broker 节点。当一个消费者组订阅了某个主题时,NameServer 会将该主题的信息发送给该消费者组所在的所有 Broker 节点。这种方式可以保证消息生产者和消费者能够快速地发现彼此,并进行消息的传输和处理。

# 5.为什么不用 Zookeeper

RocketMQ 为什么不使用 ZooKeeper 而自己开发 NameServer? 在服务发现领域,ZooKeeper 根本就不能算是最佳的选择。

1.注册中心是 CP 还是 AP 系统?

在分布式系统中,即使是对等部署的服务,因为请求到达的时间,硬件的状态,操作系统的调度,虚拟机的 GC 等,任何一个时间点,这些对等部署的节点状态也不可能完全一致,而流量不一致的情况下,只要注册中心在 A 承诺的时间内(例如 1s 内)将数据收敛到一致状态(即满足最终一致),流量将很快趋于统计学意义上的一致,所以注册中心以最终一致的模型设计在生产实践中完全可以接受。

2.注册中心不影响服务

分区容忍及可用性需求分析实践中,注册中心不能因为自身的任何原因破坏服务之间本身的可连通性,这是注册中心设计应该遵循的铁律! 在 CAP 的权衡中,注册中心的可用性比数据强一致性更宝贵,所以整体设计更应该偏向 AP,而非 CP,数据不一致在可接受范围,而 P 下舍弃 A 却完全违反了注册中心不能因为自身的任何原因破坏服务本身的可连通性的原则。

3.服务规模、容量、服务联通性

当数据中心服务规模超过一定数量,作为注册中心的 ZooKeeper 性能堪忧。 在服务发现和健康监测场景下,随着服务规模的增大,无论是应用频繁发布时的服务注册带来的写请求,还是刷毫秒级的服务健康状态带来的写请求,还是恨不能整个数据中心的机器或者容器皆与注册中心有长连接带来的连接压力上,ZooKeeper 很快就会力不从心,而 ZooKeeper 的写并不是可扩展的,不可以通过加节点解决水平扩展性问题。

4.注册中心需要持久存储和事务日志么? 需要,也不需要。

在服务发现场景中,其最核心的数据--实时的健康的服务的地址列表,真的需要数据持久化么? 不需要在服务发现中,服务调用发起方更关注的是其要调用的服务的实时的地址列表和实时健康状态,每次发起调用时,并不关心要调用的服务的历史服务地址列表、过去的健康状态。

但是一个完整的生产可用的注册中心,除了服务的实时地址列表以及实时的健康状态之外,还会存储一些服务的元数据信息,例如服务的版本,分组,所在的数据中心,权重,鉴权策略信息,服务标签等元数据,这些数据需要持久化存储,并且注册中心应该提供对这些元数据的检索的能力。

5.服务健康检查

使用 ZooKeeper 作为服务注册中心时,服务的健康检测绑定在了 ZooKeeper 对于 Session 的健康监测上,或者说绑定在 TCP 长链接活性探测上了。

ZK 与服务提供者机器之间的 TCP 长链接活性探测正常的时候,该服务就是健康的么?答案当然是否定的!注册中心应该提供更丰富的健康监测方案,服务的健康与否的逻辑应该开放给服务提供方自己定义,而不是一刀切搞成了 TCP 活性检测.

健康检测的一大基本设计原则就是尽可能真实的反馈服务本身的真实健康状态,否则一个不敢被服务调用者相信的健康状态判定结果还不如没有健康检测。

6.注册中心的容灾考虑

如果注册中心(Registry)本身完全宕机了,服务调用链路应该受到影响么? 不应该受到影响。 服务调用(请求响应流)链路应该是弱依赖注册中心,必须仅在服务发布,机器上下线,服务扩缩容等必要时才依赖注册中心。这需要注册中心仔细的设计自己提供的客户端,客户端中应该有针对注册中心服务完全不可用时做容灾的手段,例如设计客户端缓存数据机制就是行之有效的手段。另外,注册中心的健康检查机制也要仔细设计以便在这种情况不会出现诸如推空等情况的出现。 ZooKeeper 的原生客户端并没有这种能力,所以利用 ZooKeeper 实现注册中心的时候我们一定要问自己,如果把 ZooKeeper 所有节点全干掉,你生产上的所有服务调用链路能不受任何影响么?

7.你有没有 ZooKeeper 的专家可依靠?

  • 难以掌握的 Client/Session 状态机
  • 难以承受的异常处理

阿里巴巴是不是完全没有使用 ZooKeeper?并不是。熟悉阿里巴巴技术体系的都知道,其实阿里巴巴维护了目前国内最大规模的 ZooKeeper 集群,整体规模有近千台的 ZooKeeper 服务节点。 在粗粒度分布式锁分布式选主主备高可用切换等不需要高TPS 支持的场景下有不可替代的作用,而这些需求往往多集中在大数据、离线任务等相关的业务领域,因为大数据领域,讲究分割数据集,并且大部分时间分任务多进程/线程并行处理这些数据集,但是总是有一些点上需要将这些任务和进程统一协调,这时候就是 ZooKeeper 发挥巨大作用的用武之地。 但是在交易场景交易链路上,在主业务数据存取,大规模服务发现、大规模健康监测等方面有天然的短板,应该竭力避免在这些场景下引入 ZooKeeper,在阿里巴巴的生产实践中,应用对 ZooKeeper 申请使用的时候要进行严格的场景、容量、SLA 需求的评估。 总体来说,对于 ZooKeeper,大数据使用,服务发现不用。

# 6.RMQ 消息重试 msg id 会变吗

在 RabbitMQ 中,当消息被重新排队时,消息 ID 不会发生变化。当消息未被确认并被重新排队时,消息将保持其原始 ID,以便可以跟踪其状态并进行相应的处理。因此,即使消息被重新排队,您也可以使用相同的消息 ID 跟踪消息并进行处理。

当消息被重新排队时,RabbitMQ 会增加消息的delivery tag,以便与之前的 delivery tag 区分。Delivery tag 是一个整数,用于标识通道上单个传递的消息的唯一标识符。每次传递消息时,都会分配新的 delivery tag。

如果消息被重新排队多次,则每次重新排队都会增加 delivery tag 的值。这样可以确保在处理消息时不会与之前处理过的消息混淆。您可以使用 delivery tag 跟踪消息的状态和进度,并在需要时进行确认或拒绝消息。

# 7.kafka 与 RocketMQ 日志文件

Kafka 和 RocketMQ 都是流行的分布式消息中间件,它们在日志文件方面有一些相似和不同的特点。以下是 Kafka 和 RocketMQ 的日志文件对比:

Kafka:

  1. **日志文件类型:**Kafka 使用一种称为 "日志分段"(Log Segments)的文件格式。每个主题分区都有一个或多个日志分段,每个日志分段包含一定时间范围内的消息。

  2. **消息存储:**Kafka 将消息以追加写入的方式存储在每个日志分段中。每个分段都有一个偏移量范围,表示存储的消息范围。

  3. **日志维护:**Kafka 支持消息的保留策略,可以根据时间或大小来保留特定的日志分段。旧的日志分段会被定期清理和删除。

  4. **压缩:**Kafka 支持消息的压缩存储,可以在写入时进行压缩,以减少磁盘占用和网络传输带宽。

  5. 文件: index 文件,log 文件,timeindex 文件

RocketMQ:

  1. 日志文件类型: RocketMQ 采用类似传统数据库的写日志(Write-Ahead Logging,WAL)机制,将消息记录写入 CommitLog 文件中。

  2. 消息存储: RocketMQ 将消息记录追加写入 CommitLog 文件中,每个消息都对应一个唯一的物理偏移量。

  3. 消息索引: RocketMQ 在写入 CommitLog 文件的同时,还会创建索引文件以加速消息的查询和检索。索引文件支持根据消息的 Key 和 Tag 进行快速查找。

  4. 日志刷盘: RocketMQ 采用了异步刷盘策略,定期将内存中的消息刷写到磁盘,以提高性能。同时,RocketMQ 支持同步刷盘策略来确保消息的可靠存储。

  5. 日志维护: RocketMQ 支持消息的保留策略,可以根据时间或大小来保留消息。旧的 CommitLog 文件会被定期清理和删除。

Kafka 和 RocketMQ 都使用日志文件来存储消息,但在实现细节上有一些差异。Kafka 更专注于日志分段和消息的追加写入,而 RocketMQ 则结合了 WAL("Write-Ahead Logging"预写式日志) 和索引机制,以提供更高的消息查询性能和消费吞吐量。选择适合自己应用需求的消息中间件需要考虑这些特点和优势。

# 八.MQ 对比

# 1.MQ 技术选型?

Kafka

优点 缺点 使用场景
Kafka 的吞吐量几乎是行业里最优秀的,单机十几万 QPS Kafka 比较为人诟病的一点,似乎是丢数据方面的问题,接收到消息是落到磁盘缓冲区而不是磁盘中 而且一般量特别大,要求吞吐量要高,一般就是收发消息,不需要太多的高级功能
Kafka 性能也很高,基本上发送消息给 Kafka 都是毫秒级的性能 功能比较单一,只有收发消息 大数据日志采集系统
可用性也很高,Kafka 是可以支持集群部署的,其中部分机器宕机是可以继续运行的。

RabbitMQ

优点 缺点 使用场景
消息高可用,100%投递成功 QPS 不高,每秒万级 中小型公司
集群高可用 集群横向扩展十分麻烦 不需要大规模集群
功能丰富延迟队列死信队列 源码是 erlang,很难在源码基础上二次开发

RocketMQ

优点 缺点 使用场景
极高吞吐量 QPS 可达十几万 文档不太齐全 高并发互联网公司
消息高可用 复杂场景需求
可部署大规模集群
有多种高级功能,事务消息,延迟消息等
Java 语言开发,可二次修改

三者对比:

image-20230104172624024

# 2.资料文档

Kafka:中。有 kafka 作者自己写的书,网上资料也有一些。

rabbitmq:多。有一些不错的书,网上资料多。

rocketmq:少。没有专门写 rocketmq 的书,网上的资料良莠不齐,官方文档很简洁,但是对技术细节没有过多的描述。

# 3.开发语言

Kafka:Scala

rabbitmq:Erlang

rocketmq:java

# 4.支持协议

Kafka:自己定义的一套

rabbitmq:AMQP

rocketmq:自己定义的一套协议

# 5.消息存储

Kafka:内存、磁盘、数据库。支持大量堆积。

kafka 的最小存储单元是分区,一个 topic 包含多个分区,kafka 创建主题时,这些分区会被分配在多个服务器上,通常一个 broker 一台服务器。分区首领会均匀地分布在不同的服务器上,分区副本也会均匀的分布在不同的服务器上,确保负载均衡和高可用性,当新的 broker 加入集群的时候,部分副本会被移动到新的 broker 上。根据配置文件中的目录清单,kafka 会把新的分区分配给目录清单里分区数最少的目录。默认情况下,分区器使用轮询算法把消息均衡地分布在同一个主题的不同分区中,对于发送时指定了 key 的情况,会根据 key 的 hashcode 取模后的值存到对应的分区中。

rabbitmq:内存、磁盘。支持少量堆积。

rabbitmq 的消息分为持久化的消息和非持久化消息,不管是持久化的消息还是非持久化的消息都可以写入到磁盘。持久化的消息在到达队列时就写入到磁盘,并且如果可以,持久化的消息也会在内存中保存一份备份,这样可以提高一定的性能,当内存吃紧的时候会从内存中清除。非持久化的消息一般只存在于内存中,在内存吃紧的时候会被换入到磁盘中,以节省内存。

引入镜像队列机制,可将重要队列“复制”到集群中的其他 broker 上,保证这些队列的消息不会丢失。配置镜像的队列,都包含一个主节点 master 和多个从节点 slave,如果 master 失效,加入时间最长的 slave 会被提升为新的 master,除发送消息外的所有动作都向 master 发送,然后由 master 将命令执行结果广播给各个 slave,rabbitmq 会让 master 均匀地分布在不同的服务器上,而同一个队列的 slave 也会均匀地分布在不同的服务器上,保证负载均衡和高可用性。

rocketmq:磁盘。支持大量堆积。

commitLog 文件存放实际的消息数据,每个 commitLog 上限是 1G,满了之后会自动新建一个 commitLog 文件保存数据。ConsumeQueue 队列只存放 offset、size、tagcode,非常小,分布在多个 broker 上。ConsumeQueue 相当于 CommitLog 的索引文件,消费者消费时会从 consumeQueue 中查找消息在 commitLog 中的 offset,再去 commitLog 中查找元数据。ConsumeQueue 存储格式的特性,保证了写过程的顺序写盘(写 CommitLog 文件),大量数据 IO 都在顺序写同一个 commitLog,满 1G 了再写新的。加上 rocketmq 是累计 4K 才强制从 PageCache 中刷到磁盘(缓存),所以高并发写性能突出。

# 6.消息事务

Kafka:支持

rabbitmq:支持。客户端将信道设置为事务模式,只有当消息被 rabbitMq 接收,事务才能提交成功,否则在捕获异常后进行回滚。使用事务会使得性能有所下降

rocketmq:支持

# 7.负载均衡

Kafka:支持负载均衡。

  • 一个 broker 通常就是一台服务器节点。对于同一个 Topic 的不同分区,Kafka 会尽力将这些分区分布到不同的 Broker 服务器上,zookeeper 保存了 broker、主题和分区的元数据信息。分区首领会处理来自客户端的生产请求,kafka 分区首领会被分配到不同的 broker 服务器上,让不同的 broker 服务器共同分担任务。

  • 每一个 broker 都缓存了元数据信息,客户端可以从任意一个 broker 获取元数据信息并缓存起来,根据元数据信息知道要往哪里发送请求。

  • kafka 的消费者组订阅同一个 topic,会尽可能地使得每一个消费者分配到相同数量的分区,分摊负载。

  • 当消费者加入或者退出消费者组的时候,还会触发再均衡,为每一个消费者重新分配分区,分摊负载。

  • kafka 的负载均衡大部分是自动完成的,分区的创建也是 kafka 完成的,隐藏了很多细节,避免了繁琐的配置和人为疏忽造成的负载问题。

  • 发送端由 topic 和 key 来决定消息发往哪个分区,如果 key 为 null,那么会使用轮询算法将消息均衡地发送到同一个 topic 的不同分区中。如果 key 不为 null,那么会根据 key 的 hashcode 取模计算出要发往的分区。

rabbitmq:对负载均衡的支持不好。

  • 消息被投递到哪个队列是由交换器和 key 决定的,交换器、路由键、队列都需要手动创建。

  • rabbitmq 客户端发送消息要和 broker 建立连接,需要事先知道 broker 上有哪些交换器,有哪些队列。通常要声明要发送的目标队列,如果没有目标队列,会在 broker 上创建一个队列,如果有,就什么都不处理,接着往这个队列发送消息。假设大部分繁重任务的队列都创建在同一个 broker 上,那么这个 broker 的负载就会过大。(可以在上线前预先创建队列,无需声明要发送的队列,但是发送时不会尝试创建队列,可能出现找不到队列的问题,rabbitmq 的备份交换器会把找不到队列的消息保存到一个专门的队列中,以便以后查询使用)

  • 使用镜像队列机制建立 rabbitmq 集群可以解决这个问题,形成 master-slave 的架构,master 节点会均匀分布在不同的服务器上,让每一台服务器分摊负载。slave 节点只是负责转发,在 master 失效时会选择加入时间最长的 slave 成为 master。

  • 当新节点加入镜像队列的时候,队列中的消息不会同步到新的 slave 中,除非调用同步命令,但是调用命令后,队列会阻塞,不能在生产环境中调用同步命令。

  • 当 rabbitmq 队列拥有多个消费者的时候,队列收到的消息将以轮询的分发方式发送给消费者。每条消息只会发送给订阅列表里的一个消费者,不会重复。

  • 这种方式非常适合扩展,而且是专门为并发程序设计的。

  • 如果某些消费者的任务比较繁重,那么可以设置 basicQos 限制信道上消费者能保持的最大未确认消息的数量,在达到上限时,rabbitmq 不再向这个消费者发送任何消息。

  • 对于 rabbitmq 而言,客户端与集群建立的 TCP 连接不是与集群中所有的节点建立连接,而是挑选其中一个节点建立连接。但是 rabbitmq 集群可以借助 HAProxy、LVS 技术,或者在客户端使用算法实现负载均衡,引入负载均衡之后,各个客户端的连接可以分摊到集群的各个节点之中。

客户端均衡算法

  1. 轮询法。按顺序返回下一个服务器的连接地址。
  2. 加权轮询法。给配置高、负载低的机器配置更高的权重,让其处理更多的请求;而配置低、负载高的机器,给其分配较低的权重,降低其系统负载。
  3. 随机法。随机选取一个服务器的连接地址。
  4. 加权随机法。按照概率随机选取连接地址。
  5. 源地址哈希法。通过哈希函数计算得到的一个数值,用该数值对服务器列表的大小进行取模运算。
  6. 最小连接数法。动态选择当前连接数最少的一台服务器的连接地址。

rocketmq:支持负载均衡。

  • 一个 broker 通常是一个服务器节点,broker 分为 master 和 slave,master 和 slave 存储的数据一样,slave 从 master 同步数据。

  • nameserver 与每个集群成员保持心跳,保存着 Topic-Broker 路由信息,同一个 topic 的队列会分布在不同的服务器上。

  • 发送消息通过轮询队列的方式发送,每个队列接收平均的消息量。发送消息指定 topic、tags、keys,无法指定投递到哪个队列(没有意义,集群消费和广播消费跟消息存放在哪个队列没有关系)。

  • tags 选填,类似于 Gmail 为每封邮件设置的标签,方便服务器过滤使用。目前只支 持每个消息设置一个 tag,所以也可以类比为 Notify 的 MessageType 概念。

  • keys 选填,代表这条消息的业务关键词,服务器会根据 keys 创建哈希索引,设置后, 可以在 Console 系统根据 Topic、Keys 来查询消息,由于是哈希索引,请尽可能 保证 key 唯一,例如订单号,商品 Id 等。

  • rocketmq 的负载均衡策略规定:Consumer 数量应该小于等于 Queue 数量,如果 Consumer 超过 Queue 数量,那么多余的 Consumer 将不能消费消息。这一点和 kafka 是一致的,rocketmq 会尽可能地为每一个 Consumer 分配相同数量的队列,分摊负载。

# 8.集群方式

Kafka:天然的‘Leader-Slave’无状态集群,每台服务器既是 Master 也是 Slave。

  • 分区首领均匀地分布在不同的 kafka 服务器上,分区副本也均匀地分布在不同的 kafka 服务器上,所以每一台 kafka 服务器既含有分区首领,同时又含有分区副本,每一台 kafka 服务器是某一台 kafka 服务器的 Slave,同时也是某一台 kafka 服务器的 leader。

  • kafka 的集群依赖于 zookeeper,zookeeper 支持热扩展,所有的 broker、消费者、分区都可以动态加入移除,而无需关闭服务,与不依靠 zookeeper 集群的 mq 相比,这是最大的优势。

rabbitmq:支持简单集群,'复制'模式,对高级集群模式支持不好。

  • rabbitmq 的每一个节点,不管是单一节点系统或者是集群中的一部分,要么是内存节点,要么是磁盘节点,集群中至少要有一个是磁盘节点。

  • 在 rabbitmq 集群中创建队列,集群只会在单个节点创建队列进程和完整的队列信息(元数据、状态、内容),而不是在所有节点上创建。引入镜像队列,可以避免单点故障,确保服务的可用性,但是需要人为地为某些重要的队列配置镜像。

rocketmq:常用 多对'Master-Slave' 模式,开源版本需手动切换 Slave 变成 Master

  • Name Server 是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。

  • Broker 部署相对复杂,Broker 分为 Master 与 Slave,一个 Master 可以对应多个 Slave,但是一个 Slave 只能对应一个 Master,Master 与 Slave 的对应关系通过指定相同的 BrokerName,不同的 BrokerId 来定义,BrokerId 为 0 表示 Master,非 0 表示 Slave。Master 也可以部署多个。每个 Broker 与 Name Server 集群中的所有节点建立长连接,定时注册 Topic 信息到所有 Name Server。

  • Producer 与 Name Server 集群中的其中一个节点(随机选择)建立长连接,定期从 Name Server 取 Topic 路由信息,并向提供 Topic 服务的 Master 建立长连接,且定时向 Master 发送心跳。Producer 完全无状态,可集群部署。

  • Consumer 与 Name Server 集群中的其中一个节点(随机选择)建立长连接,定期从 Name Server 取 Topic 路由信息,并向提供 Topic 服务的 Master、Slave 建立长连接,且定时向 Master、Slave 发送心跳。Consumer 既可以从 Master 订阅消息,也可以从 Slave 订阅消息,订阅规则由 Broker 配置决定。

  • 客户端先找到 NameServer, 然后通过 NameServer 再找到 Broker。

  • 一个 topic 有多个队列,这些队列会均匀地分布在不同的 broker 服务器上。rocketmq 队列的概念和 kafka 的分区概念是基本一致的,kafka 同一个 topic 的分区尽可能地分布在不同的 broker 上,分区副本也会分布在不同的 broker 上。

  • rocketmq 集群的 slave 会从 master 拉取数据备份,master 分布在不同的 broker 上。

# 9.管理界面

  • Kafka:一般
  • rabbitmq:好
  • rocketmq:无

# 10.可用性

  • Kafka:非常高(分布式)
  • rabbitmq:高(主从)
  • rocketmq:非常高(分布式)

# 11.消息重复

  • Kafka:支持 at least once、at most once
  • rabbitmq:支持 at least once、at most once
  • rocketmq:支持 at least once

# 12.吞吐量 TPS

  • Kafka:极大 Kafka 按批次发送消息和消费消息。发送端将多个小消息合并,批量发向 Broker,消费端每次取出一个批次的消息批量处理。
  • rabbitmq:比较大
  • rocketmq:大 rocketMQ 接收端可以批量消费消息,可以配置每次消费的消息数,但是发送端不是批量发送。

# 13.订阅形式和消息分发

Kafka:基于 topic 以及按照 topic 进行正则匹配的发布订阅模式。

**【发送】**发送端由 topic 和 key 来决定消息发往哪个分区,如果 key 为 null,那么会使用轮询算法将消息均衡地发送到同一个 topic 的不同分区中。如果 key 不为 null,那么会根据 key 的 hashcode 取模计算出要发往的分区。

【接收】

  • consumer 向群组协调器 broker 发送心跳来维持他们和群组的从属关系以及他们对分区的所有权关系,所有权关系一旦被分配就不会改变除非发生再均衡(比如有一个 consumer 加入或者离开 consumer group),consumer 只会从对应的分区读取消息。

  • kafka 限制 consumer 个数要少于分区个数,每个消息只会被同一个 Consumer Group 的一个 consumer 消费(非广播)。

  • kafka 的 Consumer Group 订阅同一个 topic,会尽可能地使得每一个 consumer 分配到相同数量的分区,不同 Consumer Group 订阅同一个主题相互独立,同一个消息会被不同的 Consumer Group 处理。

rabbitmq:提供了 4 种:direct, topic ,Headers 和 fanout。

【发送】

  • 先要声明一个队列,这个队列会被创建或者已经被创建,队列是基本存储单元。由 exchange 和 key 决定消息存储在哪个队列。

  • direct>发送到和 bindingKey 完全匹配的队列。

  • topic>路由 key 是含有"."的字符串,会发送到含有“*”、“#”进行模糊匹配的 bingKey 对应的队列。

  • fanout>与 key 无关,会发送到所有和 exchange 绑定的队列

  • headers>与 key 无关,消息内容的 headers 属性(一个键值对)和绑定键值对完全匹配时,会发送到此队列。此方式性能低一般不用

【接收】

  • rabbitmq 的队列是基本存储单元,不再被分区或者分片,对于我们已经创建了的队列,消费端要指定从哪一个队列接收消息。

  • 当 rabbitmq 队列拥有多个消费者的时候,队列收到的消息将以轮询的分发方式发送给消费者。每条消息只会发送给订阅列表里的一个消费者,不会重复。

  • 这种方式非常适合扩展,而且是专门为并发程序设计的。

  • 如果某些消费者的任务比较繁重,那么可以设置 basicQos 限制信道上消费者能保持的最大未确认消息的数量,在达到上限时,rabbitmq 不再向这个消费者发送任何消息。

rocketmq:基于 topic/messageTag 以及按照消息类型、属性进行正则匹配的发布订阅模式

【发送】

  • 发送消息通过轮询队列的方式发送,每个队列接收平均的消息量。发送消息指定 topic、tags、keys,无法指定投递到哪个队列(没有意义,集群消费和广播消费跟消息存放在哪个队列没有关系)。

  • tags 选填,类似于 Gmail 为每封邮件设置的标签,方便服务器过滤使用。目前只支 持每个消息设置一个 tag,所以也可以类比为 Notify 的 MessageType 概念。

  • keys 选填,代表这条消息的业务关键词,服务器会根据 keys 创建哈希索引,设置后, 可以在 Console 系统根据 Topic、Keys 来查询消息,由于是哈希索引,请尽可能 保证 key 唯一,例如订单号,商品 Id 等。

【接收】

  • 广播消费。一条消息被多个 Consumer 消费,即使 Consumer 属于同一个 ConsumerGroup,消息也会被 ConsumerGroup 中的每个 Consumer 都消费一次。
  • 集群消费。一个 Consumer Group 中的 Consumer 实例平均分摊消费消息。例如某个 Topic 有 9 条消息,其中一个 Consumer Group 有 3 个实例,那么每个实例只消费其中的 3 条消息。即每一个队列都把消息轮流分发给每个 consumer。

# 14.顺序消息

Kafka:支持。

设置生产者的 max.in.flight.requests.per.connection 为 1,可以保证消息是按照发送顺序写入服务器的,即使发生了重试。kafka 保证同一个分区里的消息是有序的,但是这种有序分两种情况

  • key 为 null,消息逐个被写入不同主机的分区中,但是对于每个分区依然是有序的

  • key 不为 null , 消息被写入到同一个分区,这个分区的消息都是有序。

rabbitmq:不支持

rocketmq:支持

# 15.消息确认

Kafka:支持。

发送方确认机制

  • ack=0,不管消息是否成功写入分区

  • ack=1,消息成功写入首领分区后,返回成功

  • ack=all,消息成功写入所有分区后,返回成功。

接收方确认机制

自动或者手动提交分区偏移量,早期版本的 kafka 偏移量是提交给 Zookeeper 的,这样使得 zookeeper 的压力比较大,更新版本的 kafka 的偏移量是提交给 kafka 服务器的,不再依赖于 zookeeper 群组,集群的性能更加稳定。

rabbitmq:支持。

  • 发送方确认机制,消息被投递到所有匹配的队列后,返回成功。如果消息和队列是可持久化的,那么在写入磁盘后,返回成功。支持批量确认和异步确认。

  • 接收方确认机制,设置 autoAck 为 false,需要显式确认,设置 autoAck 为 true,自动确认。

  • 当 autoAck 为 false 的时候,rabbitmq 队列会分成两部分,一部分是等待投递给 consumer 的消息,一部分是已经投递但是没收到确认的消息。如果一直没有收到确认信号,并且 consumer 已经断开连接,rabbitmq 会安排这个消息重新进入队列,投递给原来的消费者或者下一个消费者。

  • 未确认的消息不会有过期时间,如果一直没有确认,并且没有断开连接,rabbitmq 会一直等待,rabbitmq 允许一条消息处理的时间可以很久很久。

rocketmq:支持。

消息回溯

  • Kafka:支持指定分区 offset 位置的回溯
  • rabbitmq:不支持
  • rocketmq:支持指定时间点的回溯

# 16.消息重试

Kafka:不支持,但是可以实现。kafka 支持指定分区 offset 位置的回溯,可以实现消息重试。

rabbitmq:不支持,但是可以利用消息确认机制实现。rabbitmq 接收方确认机制,设置 autoAck 为 false

当 autoAck 为 false 的时候,rabbitmq 队列会分成两部分,一部分是等待投递给 consumer 的消息,一部分是已经投递但是没收到确认的消息。如果一直没有收到确认信号,并且 consumer 已经断开连接,rabbitmq 会安排这个消息重新进入队列,投递给原来的消费者或者下一个消费者。

rocketmq:支持

消息消费失败的大部分场景下,立即重试 99%都会失败,所以 rocketmq 的策略是在消费失败时定时重试,每次时间间隔相同。

发送端的 send 方法本身支持内部重试,重试逻辑如下:

  1. 至多重试 3 次;

  2. 如果发送失败,则轮转到下一个 broker;

  3. 这个方法的总耗时不超过 sendMsgTimeout 设置的值,默认 10s,超过时间不在重试。

接收端

Consumer 消费消息失败后,要提供一种重试机制,令消息再消费一次。Consumer 消费消息失败通常可以分为以下两种情况:

  • 由于消息本身的原因,例如反序列化失败,消息数据本身无法处理(例如话费充值,当前消息的手机号被注销,无法充值)等。定时重试机制,比如过 10s 秒后再重试。

  • 由于依赖的下游应用服务不可用,例如 db 连接不可用,外系统网络不可达等。

即使跳过当前失败的消息,消费其他消息同样也会报错。这种情况可以 sleep 30s,再消费下一条消息,减轻 Broker 重试消息的压力。

# 17.并发度

Kafka:高

一个线程一个消费者,kafka 限制消费者的个数要小于等于分区数,如果要提高并行度,可以在消费者中再开启多线程,或者增加 consumer 实例数量。

rabbitmq:极高

  • 本身是用 Erlang 语言写的,并发性能高。

  • 可在消费者中开启多线程,最常用的做法是一个 channel 对应一个消费者,每一个线程把持一个 channel,多个线程复用 connection 的 tcp 连接,减少性能开销。

  • 当 rabbitmq 队列拥有多个消费者的时候,队列收到的消息将以轮询的分发方式发送给消费者。每条消息只会发送给订阅列表里的一个消费者,不会重复。

  • 这种方式非常适合扩展,而且是专门为并发程序设计的。

  • 如果某些消费者的任务比较繁重,那么可以设置 basicQos 限制信道上消费者能保持的最大未确认消息的数量,在达到上限时,rabbitmq 不再向这个消费者发送任何消息。

rocketmq:高

rocketmq 限制消费者的个数少于等于队列数,但是可以在消费者中再开启多线程,这一点和 kafka 是一致的,提高并行度的方法相同。修改消费并行度方法

  • 同一个 ConsumerGroup 下,通过增加 Consumer 实例数量来提高并行度,超过订阅队列数的 Consumer 实例无效。

  • 提高单个 Consumer 的消费并行线程,通过修改参数 consumeThreadMin、consumeThreadMax

同一个网络连接 connection,客户端多个线程可以同时发送请求,连接会被复用,减少性能开销。

上次更新: 11/26/2024, 10:00:19 PM