# 一.认识 kafka

# 1.kafka 的定义?

Kafka 传统定义:Kafka 是一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要应用于大数据实时处理领域。发布/订阅:消息的发布者不会将消息直接发送给特定的订阅者,而是将发布的消息分为不同的类别,订阅者只接收感兴趣的消息。

Kafka 最新定义: Kafka 是一个开源的分布式事件流平台( Event Streaming Platform),被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用。

# 2.kafka 应用场景?

  • 消息系统
  • 储存系统
  • 流式处理平台

消息系统: Kafka 和传统的消息系统(也称作消息中间件)都具备系统解耦冗余存储流量削峰缓冲异步通信扩展性可恢复性等功能。与此同时,Kafka 还提供了大多数消息系统难以实现的消息顺序性保障及回溯消费的功能。

存储系统: Kafka 把消息持久化到磁盘,相比于其他基于内存存储的系统而言,有效地降低了数据丢失的风险。也正是得益于 Kafka 的消息持久化功能和多副本机制,我们可以把 Kafka 作为长期的数据存储系统来使用,只需要把对应的数据保留策略设置为“永久”或启用主题的日志压缩功能即可。

流式处理平台: Kafka 不仅为每个流行的流式处理框架提供了可靠的数据来源,还提供了一个完整的流式处理类库,比如窗口、连接、变换和聚合等各类操作。

# 3.为什么不使用 Flume?

Kafka 更侧重于数据的存储以及流数据的实时处理,是一个追求高吞吐量、高负载的消息队列。

Flume 则是侧重于数据的采集和传输,提供了很多种接口支持多种数据源的采集,但是 Flume 并不直接提供数据的持久化。

就吞吐量和稳定性来说,Flume 不如 Kafka。所以在使用场景上,如果你需要在两个数据生产和消费速度不同的系统之间传输数据,比如实时产生的数据生产速度会经常发生变化,时段不同会有不同的峰值,如果直接写入 HDFS 可能会发生拥堵,在这种过程中加入 Kafka,就可以先把数据写入 Kafka,再用 Kafka 传输给下游。而对于 Flume 则是提供了更多封装好的组件,也更加轻量级,最常用于日志的采集,省去了很多自己编写代码的工作。

# 4.命令行工具

Kafka 的命令行工具在 Kafka 包的/bin目录下,主要包括服务和集群管理脚本,配置脚本,信息查看脚本,Topic 脚本,客户端脚本等。

  • kafka-configs.sh:配置管理脚本
  • kafka-console-consumer.sh:kafka 消费者控制台
  • kafka-console-producer.sh:kafka 生产者控制台
  • kafka-consumer-groups.sh:kafka 消费者组相关信息
  • kafka-delete-records.sh:删除低水位的日志文件
  • kafka-log-dirs.sh:kafka 消息日志目录信息
  • kafka-mirror-maker.sh:不同数据中心 kafka 集群复制工具
  • kafka-preferred-replica-election.sh:触发 preferred replica 选举
  • kafka-producer-perf-test.sh:kafka 生产者性能测试脚本
  • kafka-reassign-partitions.sh:分区重分配脚本
  • kafka-replica-verification.sh:复制进度验证脚本
  • kafka-server-start.sh:启动 kafka 服务
  • kafka-server-stop.sh:停止 kafka 服务
  • kafka-topics.sh:topic 管理脚本
  • kafka-verifiable-consumer.sh:可检验的 kafka 消费者
  • kafka-verifiable-producer.sh:可检验的 kafka 生产者
  • zookeeper-server-start.sh:启动 zk 服务
  • zookeeper-server-stop.sh:停止 zk 服务
  • zookeeper-shell.sh:zk 客户端

我们通常可以使用kafka-console-consumer.shkafka-console-producer.sh脚本来测试 Kafka 生产和消费,kafka-consumer-groups.sh可以查看和管理集群中的 Topic,kafka-topics.sh通常用于查看 Kafka 的消费组情况。

# 5.kafka 组件

Apache Kafka 是一个开源的流数据平台,用于构建实时数据流应用和数据管道。它具有高吞吐量、可扩展性和持久性的特点,适用于处理大规模的实时数据流。Kafka 由多个组件组成,下面是一些重要的 Kafka 组件:

  1. Producer(生产者):生产者将数据发布到 Kafka 集群,将数据记录推送到一个或多个 Kafka 主题(topic)。

  2. Broker:Kafka 集群由多个 Broker 组成,每个 Broker 是一个独立的服务器,负责存储数据和处理数据请求。

  3. Consumer(消费者):消费者从 Kafka 主题中读取数据记录,并进行处理。消费者可以以组的形式存在,每个组内的消费者共享主题的数据,从而实现负载均衡和高可用性。

  4. Topic:主题是数据记录的逻辑容器,生产者将数据发布到主题,消费者从主题中读取数据。每个主题可以有多个分区(partitions),分区允许数据水平扩展。

  5. Partition:分区是主题的物理分隔,每个分区都是有序且持久化的日志,生产者将数据写入分区,消费者从分区读取数据。

  6. Zookeeper:虽然不是 Kafka 内部的一部分,但 Kafka 高度依赖于 Apache ZooKeeper 来进行集群管理、协调和维护元数据信息。

  7. Connect:Kafka Connect 是一个可扩展的工具和框架,用于将 Kafka 与外部数据源和数据接收器连接起来,从而实现数据的导入和导出。

  8. Streams:Kafka Streams 是一个流处理库,允许开发者构建和处理实时数据流应用,可以进行数据转换、聚合、计算等操作。

  9. ACLs(Access Control Lists):用于控制用户和客户端对 Kafka 主题和资源的访问权限。

这些组件共同构成了 Kafka 的核心架构,使其成为一个强大的实时数据流平台。请注意,Kafka 生态系统在不断发展,可能会有新的组件或功能添加到其中。

# 6.Kafka 中 zk 的作用?

在 Apache Kafka 中,Zookeeper(简称 zk)起着重要的作用,它是分布式开源的协调服务,主要用于管理和维护 Kafka 集群的元数据、配置信息以及集群中各个 Broker 的状态。以下是 Zookeeper 在 Kafka 中的几个关键作用:

  1. 集群协调与管理: Zookeeper 负责管理 Kafka 集群的整体结构,包括维护 Broker 的信息、Topic 的分区分配、消费者组的信息等。每个 Broker 都会注册到 Zookeeper,并且 Zookeeper 会监控 Broker 的状态变化,从而实现故障检测和自动恢复。

  2. 元数据存储: Kafka 中的元数据,如 Topic、分区、副本等信息,都存储在 Zookeeper 中。消费者可以通过查询 Zookeeper 获取最新的元数据,从而了解哪些 Topic 可用以及如何定位分区和副本。

  3. Leader 选举: 在 Kafka 集群中,每个分区都有一个 Leader 和若干个 Follower 副本。Zookeeper 协助实现了 Leader 的选举过程。当 Leader 副本失效时,Zookeeper 会协调 Follower 之间的选举过程,选择新的 Leader。

  4. 消费者组协调: Zookeeper 协助管理消费者组的状态和偏移量(offset)。消费者可以在 Zookeeper 中注册自己所属的消费者组,并且跟踪每个消费者的消费进度。

  5. 配置管理: Kafka 的一些配置参数也存储在 Zookeeper 中。当需要修改配置时,Kafka 会通过 Zookeeper 来实现分布式配置的更新。

需要注意的是,尽管 Zookeeper 在过去在 Kafka 中起到了重要作用,但自从 Kafka 版本 2.8.0 开始,Kafka 团队引入了一项称为“KRaft”的新的分布式协议,旨在取代 Zookeeper 作为 Kafka 的协调服务。这是一个逐步的过程,Kafka 用户可以选择继续使用 Zookeeper,或者在适当时机迁移到 KRaft 协议上。这个变化有助于简化 Kafka 集群的架构和维护。

# 二.生产者

# 1.发送流程

整个生产者客户端由两个线程协调运行,这两个线程分别为主线程Sender 线程(发送线程)。在主线程中由 KafkaProducer 创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器(RecordAccumulator, 也称为消息收集器)中。

image-20221021164528468

# 2.sender 关键 2 个参数

Sender 线程负责从 RecordAccumulator 中获取消息并将其发送到 Kafka 中。

  • batch.size: 只有数据积累到 batch.size 之后,sender 才会发送数据。默认 16kb
  • linger.ms: 如果数据迟迟未达到 batch.size,sender 线程等待 linger.ms 设置的时间到了之后就会发送数据。单位 ms, 默认值是 0ms,表示没有延迟。
  • 这两个参数是提高生产者吞吐量的关键.

# 3.acks 说明

应答 acks:

应答设置 描述
0 生产者发送的数据不需要等数据落盘即应答。
1 生产者发送的数据,Leader 收到数据后就应答。
-1 (all) 生产者发送的数据,Leader 和 ISR 队列的所有节点收齐数据后应答。-1 和 all 等价。

# 4.生产者如何提高吞吐量?

配置项 默认值 修改建议
batch.size 16KB 修改为 32KB
linger.ms 0 修改为 5-100 毫秒
compression.type producer 修改为 snappy
RecordAccumulator 32MB 修改为 64MB

# 5.副本的种类?

缩写 全称 解释
AR Assigned Replicas 在 Kafka 集群中,每个分区都可以有多个副本,AR 指的是被分配到特定 Broker 上的分区副本。
ISR In-Sync Replicas ISR 指的是与分区的 Leader 副本保持同步的副本集合。Kafka 使用 ISR 来确保数据的可靠性。
OSR Out-of-Sync Replicas OSR 指的是与分区的 Leader 副本不保持同步的副本。Kafka 集群会努力将 OSR 副本带回同步状态。

Leader 收到数据,所有 Follower 都开始同步数据,但有一个 Follower,因为某种故障,迟迟不能与 Leader 进行同步,那这个问题怎么解决呢? Leader 维护了一个动态的 in-sync replica set (ISR) ,意为和 Leader 保持同步的 Follower+Leader集合(leader: 0, isr:0,1,2)。如果 Follower 长时间未向 Ieader 发送通信请求或同步数据,则该 Follower 将被踢出 ISR。该时间阈值由replica.lag.time.max.ms 参数设定,默认 30s。例如 2 超时,(leader:0, isr:0,1)。这样就不用等长期联系不上或者已经故障的节点。

# 6.保证数据完全可靠?

数据完全可靠条件= ACK 级别设置为ALL(-1)+ 分区副本大于等于2+ ISR里应答的最小副本数量大于等于2

可靠性总结:

acks 配置 描述 可靠性 效率
0 生产者发送的数据无需等待应答,可靠性较差,效率高。 较低
1 生产者发送的数据等待 Leader 应答,可靠性中等,效率中等。 中等 中等
-1/all 生产者发送的数据等待 Leader 和 ISR 中所有 Follower 应答,可靠性高,效率低。

在生产环境中,acks=0 很少使用; acks=1, 一般用于传输普通日志,允许丢个别数据; acks=-1, 一般用于传输和钱相关的数据,对可靠性要求比较高的场景。

# 7.如何解决数据重复?

至少一次和最多一次:

  • 至少一次(At Least Once) = ACK 级别设置为 all +分区副本大于等于 2 + ISR 里应答的最小副本数量大于等于 2
  • 最多一次(At Most Once) = ACK 级别设置为 0

总结:

  • At Least Once 可以保证数据不丢失,但是不能保证数据不重复;
  • At Most Once 可以保证数据不重复,但是不能保证数据不丢失。

# 8.精准一次

精确一次(Exactly Once) :对于一些非常重要的信息,比如和钱相关的数据,要求数据既不能重复也不丢失。

kafka 在 0.11 版本引入了幂等性和事务.

幂等性就是指 Producer 不论向 Broker 发送多少次重复数据,Broker 端都只会持久化一条,保证了不重复。

精确一次(Exactly Once) = 幂等性 + 至少 1 次( ack=-1 +分区副本数>=2 + ISR 最小副本数量>=2)

重复数据的判断标准:具有<PID, Partition, SeqNumber> 相同主键的消息提交时,Broker 只会持久化一条。其中 PID 是 Kafka 每次重启都会分配一个新的; Partition 表示分区号; Sequence Number 是单调自增的。 所以幂等性只能保证的是在单分区单会话内不重复

属性 描述
pid 每次 Kafka 重启时分配的唯一标识,用于消息判断。
分区号 指示消息所在的 Kafka 分区。
序列号 单调递增的标识,用于判断消息顺序和重复。

如何使用幂等性,开启参数 enable.idempotence 默认为 true,默认打开状态, false 关闭。

# 9.kafka 事务原理?

开启事务,必须开启幂等性.

  • Producer 在使用事务功能前,必须先自定义一个唯一的 transactional.id。 有了 transactional.id, 即使客户端挂掉了,它重启后也能继续处理未完成的事务
  • 默认有 50 个分区,每个分区负责一部分事务。事务划分是根据 transactional.id 的 hashcode 值%50,计算出该事务属于哪个分区。该分区 Leader 副本所在的 broker 节点即为这个 transactional.id 对应的 Transaction Coordinator 节点。

image-20231212224214239

事务的执行过程

参照上图,以下是 Kafka 事务的执行过程,按顺序列出了每个阶段的描述:

  1. 初始化:在事务处理开始之前,生产者应用程序需要初始化事务环境。这包括创建一个事务性生产者,并为事务分配一个唯一的事务 ID。

  2. 启动:一旦事务环境初始化完成,生产者应用程序可以开始启动事务。在事务启动后,生产者发送的所有消息都将被视为属于该事务。

  3. 消费 Offset:如果生产者需要在事务中使用消费者组来消费某些消息,它需要首先从消费者组的 offset 开始消费。这些消费的消息也会被包含在当前事务中。

  4. 提交:生产者将一系列消息发送到 Kafka 主题,这些消息将在事务中分组。一旦所有消息都被发送完成,生产者可以选择提交事务。提交事务会将所有发送的消息标记为来自于同一个事务,并将它们添加到 Kafka 日志中。如果提交成功,消息将被持久化,否则将回滚。

  5. 终止:在事务成功提交后,生产者应用程序可以选择终止事务。终止事务后,生产者可以启动新的事务或执行其他操作。

这些阶段描述了 Kafka 事务的基本执行过程。事务能够确保在分布式环境中进行可靠的消息处理,并提供了原子性和一致性保障。请注意,Kafka 事务的执行过程可能会因实际使用情况和配置而有所不同。

# 10.如何保证数据有序?

确保数据有序性在 Kafka 中通常需要考虑以下几点:

  1. 单分区:如果所有数据都发送到同一个分区(单分区),则数据将保持有序。这是因为 Kafka 会按照消息的顺序将其写入单个分区,因此来自同一个分区的消息将按照发送顺序保持有序。

  2. 多分区:如果数据被分发到多个分区,那么在 Kafka 内部,不同分区的数据可能会交错到不同的日志段中,因此不同分区之间的数据顺序不能得到保证。在这种情况下,可以在消费端进行排序。消费者可以通过使用消息中的时间戳或其他有序标识来对消息进行排序,从而确保消费时的有序性。但这种方式需要额外的处理,并可能导致一些性能损耗。

  3. 处理有序性问题:如果数据有序性对您的应用很重要,您可以采取以下措施来处理有序性问题:

    • 将相关的数据发送到同一个分区,这可以通过使用自定义分区器来实现。
    • 在消费端进行排序,可以使用消费者的缓冲区来对数据进行排序,或者在消费端应用中使用某种缓存和排序机制。

Kafka 在单分区的情况下可以保证数据有序,而在多分区的情况下,需要在消费端进行适当的排序处理。具体的实现方式取决于应用的需求和场景。

# 11.kafka 有序的原理

Kafka 版本 条件 原理描述
1.x 版本之前 max.in.flight.requests.per.connection=1 在单分区中保证有序。
1.x 及以后版本 未开启幂等性:max.in.flight.requests.per.connection 需设置为 1。 在单分区中保证有序。
开启幂等性:max.in.flight.requests.per.connection 需设置小于等于 5。 kafka 服务端缓存最近 5 个请求的元数据,保证最近 5 个请求的数据有序。

# 12.生产者异步发送

步骤 描述
1. 配置参数 - 连接 bootstrap-server 用于指定 Kafka 集群的地址。
- 配置 keyvalue 的序列化方式。
2. 创建生产者 使用 KafkaProducer<String, String>() 创建生产者实例。
3. 发送数据 使用 send() 方法发送数据到指定主题。
- 可以使用带有回调函数的 send() 方法来处理异步发送的结果。
4. 关闭资源 在使用完成后,调用 close() 方法关闭生产者资源。
public static void main(String[] args) {
    //1.配置文件
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091,localhost:9092,localhost:9093");
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CompanySerializer.class);
    //2.创建生产者
    KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
    //3.发送数据
    for (int i = 0; i < 5; i++) {
      kafkaProducer.send(new ProducerRecord<>("first", "hello" + i)); //主题+value
    }
    //4.关闭资源
    kafkaProducer.close();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

# 13.生产者同步发送

public static void main(String[] args) throws ExecutionException, InterruptedException {
  //1.配置文件
  Properties properties = new Properties();
  properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091,localhost:9092,localhost:9093");
  properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CompanySerializer.class);
  //2.创建生产者
  KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
  //3.发送数据
  for (int i = 0; i < 5; i++) {
    kafkaProducer.send(new ProducerRecord<>("first", "hello" + i)).get(); //主题+value
  }
  //4.关闭资源
  kafkaProducer.close();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

# 14.自定义分区

表名作为 key 发到一个分区

public class BizPartitioner implements Partitioner {

  @Override
  public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    return 1;
  }

  @Override
  public void close() {

  }

  @Override
  public void configure(Map<String, ?> configs) {

  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class BizPartitionerTest {

  public static void main(String[] args) {
    Map<String, Object> configs = Maps.newHashMap();
    configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091,localhost:9092,localhost:9093");
    configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    // 添加分区对应的业务分区,自定义实现类
    configs.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, BizPartitioner.class);
    // 创建生产者实例
    Producer producer = new KafkaProducer(configs);
    try {
      for (int i = 0; i < 10; i++) {
        // 创建消息
        ProducerRecord record = new ProducerRecord("Hello-Kafka", "hello kitty " + i + "!");
        // 发送并获取 发送结果
        RecordMetadata metadata = (RecordMetadata) producer.send(record).get();
        System.out.println(String.format("topic=%s, msg=%s, partition=%s, offset=%s, timestamp=%s",
                                         metadata.topic(), record.value(),
                                         metadata.partition(), metadata.offset(), metadata.timestamp()));
      }
    } catch (Exception e) {
      e.printStackTrace();
    }
    producer.close();
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

# 15.数据分区原则

注意:

  • 如果 key 不为 null, 那么计算得到的分区号会是所有分区中的任意一个;
  • 如果 key 为 null, 那么计算得到的分区号仅为可用分区中的任意一个,注意两者之间的差别。

消息被顺序追加到每个分区日志文件的尾部。Kafka 中的分区可以分布在不同的服务器(broker)上,也就是说,一个主题可以横跨多个 broker,以此来提供比单个 broker 更强大的性能。

ProducerRecord 的构造方法较多,也是处理业务需求的关键

  1. 指明 partition 的情况下,直接将指明的值作为 partition 值;例如 partition=0,所有数据写入分区 0

  2. 没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition 数进行取余得到 partition 值;例如: key1 的 hash 值=5,key2 的 hash 值=6 ,topic 的 partition 数=2,那么 key1 对应的 value1 写入 1 号分区,key2 对 应的 value2 写入 0 号分区。

  3. 既没有 partition 值又没有 key 值的情况下,Kafka 采用 Sticky Partition ( 黏性分区器),会随机选择一个分区,并尽可能一直使用该分区,待该分区的 batch 已满或者已完成,Kafka 再随机一个分区进行使用(和上一次的分区不同)。例如:第一次随机选择 0 号分区, 等 0 号分区当前批次满了(默认 16k) 或者 linger.ms 设置的时间到,Kafka 再随机一个分区进行使用(如果还是 0 会继续随机)。

public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) {
  this(topic, partition, timestamp, key, value, (Iterable)null);
}

public ProducerRecord(String topic, Integer partition, K key, V value) {
  this(topic, partition, (Long)null, key, value, (Iterable)null);
}
public ProducerRecord(String topic, K key, V value) {
  this(topic, (Integer)null, (Long)null, key, value, (Iterable)null);
}
public ProducerRecord(String topic, V value) {
  this(topic, (Integer)null, (Long)null, (Object)null, value, (Iterable)null);
}
1
2
3
4
5
6
7
8
9
10
11
12
13

# 16.生产者参数

配置参数 默认值 描述
acks 1 生产者等待 Leader 应答的配置值。
max.request.size 1MB 单个请求的最大消息大小。
retries 0 生产者请求失败时的重试次数。
retry.backoff.ms 100 两次重试尝试之间的等待时间(毫秒)。
compression.type none 消息压缩类型,例如 snappygzip
connections.max.idle.ms 9 分钟 连接空闲时的最大等待时间(毫秒)。
linger.ms 0 消息发送前的等待时间,以聚集更多消息。
receive.buffer.bytes 32KB Socket 接收缓冲区大小。
request.timeout.ms 30000ms 等待服务器响应的最大时间(毫秒)。

# 17.生产者是线程安全的吗?

Kafka 的 Java 生产者(Producer)是线程安全的。这意味着你可以在多个线程之间安全地共享一个生产者实例。实际上,Kafka 生产者内部有一个缓冲区和一个后台 I/O 线程,这样做有几个优点:

  1. 效率: 由于内部缓冲区和批处理机制,多个线程使用同一个生产者实例可以提高消息的发送效率。

  2. 简化配置管理: 与创建和管理多个生产者实例相比,使用单一实例简化了配置管理。

  3. 网络资源: 共享一个生产者实例意味着更有效地利用网络资源。

  4. 客户端内部优化: Kafka 生产者可以更好地优化内部的数据结构和网络调用,因为它具有全局的视野。

尽管生产者是线程安全的,但在多线程环境中使用单一生产者实例时还是需要注意一些问题:

  1. 错误处理: 如果一个线程在发送消息时遇到问题(比如,由于缓冲区满了或元数据获取失败等),你需要确保有合适的错误处理机制,这样不会影响其他线程。

  2. 阻塞和超时: 如果不正确地设置生产者的参数(比如 max.block.ms, buffer.memory 等),可能会导致生产者在某些条件下阻塞,这会影响所有使用该实例的线程。

Kafka 生产者是设计为线程安全的,适合在多线程环境中使用。然而,正确地配置和错误处理仍然是必要的。

# 三.消费者

# 1.kafka 消费模式?

消费方式 描述
拉(pull)模式 消费者(consumer)主动从 Kafka Broker 中拉取数据。
推(push)模式 Kafka Broker 主动将消息推送给消费者。然而,Kafka 实际上采用了拉模式,因为推送难以适应不同消费者的消费速率。

Kafka 采用拉模式的原因之一是确保消费者可以根据自身的处理能力和速率来消费数据,从而避免数据被推送过快导致的消费者无法处理的问题。

# 2.消息队列的两种模式?

模式 描述
点对点模式 - 一个生产者、一个消费者、一个主题。
- 消费者主动拉取数据,处理后清除消息。
发布/订阅模式 - 可以有多个主题(例如:浏览、点赞、收藏、评论等)。
- 消费者消费数据后不删除消息。
- 每个消费者相互独立,都可以消费数据。

这两种模式在 Kafka 中都得到支持,可以根据实际场景选择适合的模式来设计和实现应用程序。

如果所有的消费者都隶属于同一个消费组,那么所有的消息都会被均衡地投递给每一个消费者,即每条消息只会被一个消费者处理,这就相当于点对点模式的应用。 如果所有的消费者都隶属于不同的消费组,那么所有的消息都会被广播给所有的消费者,即每条消息会被所有的消费者处理,这就相当于发布/订阅模式的应用。

# 3.kafka 消费者总体流程

Consumer Group (CG):消费者组,由多个 consumer 组成。形成一个消费者组的条件,是所有消费者的 groupid 相同

  • 消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费。
  • 消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。

image-20221027172958245

# 4.消费者组的初始化

coordinator:辅助实现消费者组的初始化和分区的分配。 coordinator 节点选择: groupidhashcode 值 %50(consumer_offsets 的分区数量) 例如:groupidhashcode 值= 1, 1%50=1,那么 consumer_offsets 主题的 1 号分区,在哪个 broker 上,就选择这个节点的 coordinator 作为这个消费者组的老大。消费者组下的所有的消费者提交 offset 的时候就往这个分区去提交 offset.

image-20221027173219104

# 5.消费者组的消费流程

image-20221027174306025

Kafka 消费者组的消费流程涉及多个组件和步骤,以下是消费者组的基本消费流程:

  1. 消费者组的创建和加入

    • Kafka 消费者可以组成消费者组,多个消费者可以属于同一个消费者组。
    • 消费者组在消费数据时可以实现负载均衡和高可用性。
  2. 分区分配

    • Kafka 主题被划分为多个分区,消费者组中的每个消费者会被分配一个或多个分区。
    • 消费者组协调器负责管理消费者和分区之间的分配。
  3. 消费者拉取数据

    • 消费者从分配给它的分区中拉取数据。
    • 消费者可以选择定期拉取数据,或者通过长轮询等方式等待新数据。
  4. 处理数据

    • 消费者处理拉取到的数据,可能会进行业务逻辑处理、数据转换等操作。
  5. 提交消费位移

    • 消费者在处理完数据后,需要将消费位移(offset)提交到 Kafka 服务器。
    • 提交位移的操作是原子性的,确保消费者下次从正确的位置继续消费。
  6. 重平衡(Rebalance)

    • 当消费者组内的消费者数量发生变化(例如有新消费者加入或旧消费者退出)时,会触发重平衡操作。
    • 重平衡会重新分配分区,确保消费者组内的每个消费者都获得一些分区进行消费。
  7. 循环消费

    • 消费者会反复执行拉取数据、处理数据、提交位移等步骤,实现循环消费。
  8. 关闭和退出

    • 消费者在不需要消费数据时,应调用关闭方法关闭消费者,释放资源。

以上流程大致描述了 Kafka 消费者组的消费流程。消费者组的机制使得多个消费者可以协同工作,高效地消费数据,并在消费者变化时自动进行分区重平衡。

# 6.常见消费场景

  1. 普通消费
  2. 多线程消费
  3. 指定分区消费
  4. 正则订阅
  5. 自定义反序列化,Deserializer
  6. 设置分区分配策略
  7. offset 自动提交,手动提交
  8. 指定 offset 进行消费
  9. 指定时间进行消费
#多线程消费KafkaConsumerProcess实现Runnable
new Thread(new KafkaConsumerProcess(properties, "Hello-Kafka"), "thread-one").start();

#分配对应的分区
consumer.assign(ImmutableList.of(new TopicPartition("Hello-Kafka", 0)));

#正则订阅主题
consumer.subscribe(Pattern.compile("Hello-.*"));

#自定义反序列化
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CompanyDeserializer.class.getName());

public class CompanyDeserializer implements Deserializer<Company>
@Override
public Company deserialize(String topic, byte[] data) {}


#设置分区分配策略
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, StickyAssignor.class.getName());
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

# 7.消费者时间参数

配置参数 描述
session.timeout.ms group coordinator 检测消费者发生崩溃所需的时间。如果在指定时间内未收到消费者的心跳,则认为消费者已挂掉。阈值 45 秒
heartbeat.interval.ms 每个消费者定期向 group coordinator 发送心跳包,证明自己还活着。用于监测消费者的存活情况。默认 3 秒
max.poll.interval.ms 如果消费者两次拉取消息的间隔超过该时间,会被移出消费者组,触发 rebalance。用于检测消费者的消息处理能力。默认 5 分钟

以上配置参数用于 Kafka 消费者组维护和监控消费者的健康状态,包括心跳、超时、消息处理能力等。合理配置这些参数可以保证消费者组的稳定运行,避免不必要的 rebalance。

max.poll.interval.ms 设置不合理,会导致不停地 rebalance,一般会做一些失败的重试处理。比如通过线程池的 ThreadPoolExecutor#afterExecute()方法捕获到异常,再次提交 Runnable 任务重新订阅 kafka topic。本来消费处理需要很长的时间,如果某个 consumer 处理超时:消息处理逻辑的时长大于 max.poll.interval.ms (或者消息处理过程中发生了异常),被 coordinator 移出了 consumer 组,这时由于失败的重试处理,自动从线程池中拿出一个新线程作为消费者去订阅 topic,那么意味着有新消费者加入 group,就会引发 rebalance,而可悲的是:新的消费者还是来不及处理完所有消息,又被移出 group。如此循环,就发生了不停地 rebalance 的现象。

# 8.消费者线程

new KafkaConsumer 对象后,在 while true 循环中执行 consumer.poll 操作拉取消息中,会有两个线程执行:一个是 heartbeat 线程,另一个是 processing 线程。

  • processing 线程可理解为调用 consumer.poll 方法执行消息处理逻辑的线程,
  • heartbeat 线程是一个后台线程,对程序员是"隐藏不见"的。heartbeat 线程每隔 heartbeat.interval.ms 向 coordinator 发送一个心跳包,证明自己还活着。但是如果 group coordinator 在一个 heartbeat.interval.ms 周期内未收到 consumer 的心跳,就把该 consumer 移出 group,这有点说不过去。事实上,有可能网络延时,有可能 consumer 出现了一次长时间 GC,影响了心跳包的到达,说不定下一个 heartbeat 就正常了。而如果 group coordinator 在 session.timeout.ms 内未收到 consumer 的心跳,那就会把该 consumer 移出 group。
  • 而 max.poll.interval.ms 参数,在 consumer 两次 poll 操作间隔超过了这个时间,即 consumer 的消息处理逻辑时长超过了 max.poll.interval.ms,该消费者就会被提出消费者组。

# 9.分区分配策略

消费者组中的 consumer 是如何确定自己该消费哪些分区的数据的?

分区分配策略 描述
RoundRobin(轮询) 消费者按照在消费者列表中的顺序依次获取分区,实现均匀分配。
Range(按范围) 分区按照一定的范围进行划分,消费者组内的消费者依次分配这些范围内的分区。
Sticky 消费者尽量保持消费之前的分区分配,以保持一定的顺序性。适用于有状态的场景。
CooperativeSticky 改进的 Sticky 策略,智能管理分区分配,减少 rebalance 操作。

分区分配策略可以通过配置参数 partition.assignment.strategy 进行设置。默认情况下,Kafka 使用的是 Range+CooperativeSticky 策略,这个策略在一般情况下能够提供较好的分区负载均衡和消费性能。

# 10.分区重分配的条件

触发分区重分配的条件 描述
消费者加入或退出消费者组 新消费者加入或现有消费者退出消费者组时,会触发分区重分配。新消费者需要分配分区,退出的消费者的分区需要重新分配。
消费者失效或无法与协调器通信 若某消费者在一段时间内未能与协调器通信,协调器可能认为该消费者失效,从而触发分区重分配。
消费者心跳超时 每个消费者定期发送心跳信号给协调器,表示自己仍活跃。若协调器一定时间内未收到心跳信号,可能将消费者标记为失效,导致分区重分配。
分区数量发生变化 若 Kafka 主题的分区数量发生变化,例如扩容或缩容分区,将导致消费者组的分区分配发生变化,触发分区重分配。

分区重分配的目标是确保消费者组内的消费者分配到适当数量的分区,以实现消费的负载均衡和高效性。Kafka 提供了不同的分区分配策略,可以在一定程度上调整分区重分配的行为,以满足不同的需求和场景。

需要注意,分区重分配可能会在消费者组内引起短暂的不稳定情况,因此在设计应用程序时需要考虑如何处理这种情况,以确保消费的稳定性和正确性。

# 11.Range 策略

Range 策略是针对 topic 而言的,在进行分区分配时,为了尽可能保证所有 consumer 均匀的消费分区,会对同一个 topic 中的 partition 按照序号排序,并对 consumer 按照字典顺序排序。

然后为每个 consumer 划分固定的分区范围,如果不够平均分配,那么排序靠前的消费者会被多分配分区。具体就是将 partition 的个数除于 consumer 线程数来决定每个 consumer 线程消费几个分区。如果除不尽,那么前面几个消费者线程将会多分配分区。

#当存在有2个Kafka topic(t1和t2),它们都有10个partition,那么最后分区结果为:
C0-0 将消费t1主题的0123分区以及t2主题的0123分区
C1-0 将消费t1主题的456分区以及t2主题的456分区
C2-1 将消费t1主题的789分区以及t2主题的789分区
1
2
3
4

Range 策略的问题,topic 多了后,消费者分配不均,会出现部分消费者过载.数据倾斜.

Range 策略,某一个消费者挂了后,会全部分配到其他某一个分区,再次发送,会重新分配

# 12.RoundRobin 策略

RoundRobin 策略的工作原理:将所有 topic 的 partition 组成 TopicAndPartition 列表,然后对 TopicAndPartition 列表按照 hashCode 进行排序,最后按照 RoundRobin 风格将分区分别分配给不同的消费者。

使用 RoundRobin 策略必须满足以下条件:

  • 同一个 Consumer Group 里面的所有 consumer 的 num.streams 必须相等
  • 每个 consumer 订阅的 topic 必须相同

假设消费组 CG1 中有 C0 和 C1 两个 consumer 的 num.streams 都为 2。按照 hashCode 排序完的 topic-partition 组依次为 t1-5, t1-3, t1-0, t1-8, t1-2, t1-1, t1-4, t1-7, t1-6, t1-9,我们的消费者排序为 C0-0, C0-1, C1-0, C1-1,最后分区分配的结果为:

C0-0将消费t1-5、t1-2、t1-6分区
C0-1将消费t1-3、t1-1、t1-9分区
C1-0将消费t1-0、t1-4分区
C1-1将消费t1-8、t1-7分区
1
2
3
4

# 13.Sticky 分配策略

粘性策略它主要有两个目的:

  • 分区的分配要尽可能的均匀

  • 分区的分配尽可能的与上次分配的保持相同

当两者发生冲突时,第一个目标优先于第二个目标。鉴于这两个目标,StickyAssignor 策略的具体实现要比 RangeAssignor 和 RoundRobinAssignor 这两种分配策略要复杂很多。

# 14.CooperativeSticky 策略

CooperativeSticky 是 Kafka 新引入的一种分区分配策略,旨在提供更智能和平稳的分区重分配。它在消费者组的成员发生变化时,尤其是在新消费者加入或现有消费者退出时,能够更加灵活地管理分区分配,从而减少不必要的 rebalance 操作。

以下是 CooperativeSticky 分区分配策略的一些特点和优势:

  1. 智能性CooperativeSticky 策略会根据消费者的处理能力、负载和活跃性等因素,优化分区的分配。它会考虑到消费者的当前状态,尽量避免将分区从一个活跃的消费者移交给一个不活跃的消费者。

  2. 幂等性:该策略支持幂等性消费,即在发生重分配时,消费者不会重复处理之前已经处理过的消息。这有助于避免重复消费问题。

  3. 降低 Rebalance 影响CooperativeSticky 策略通过在分区分配变化时进行优雅的状态转移,降低了 rebalance 对消费者组的影响。它尽量避免不必要的重新分配,从而减少了消费者的中断。

  4. 动态加入和退出:在消费者加入或退出时,CooperativeSticky 策略会以更智能的方式重新分配分区,以保持消费者组的平衡。

  5. 适用性CooperativeSticky 策略适用于多种场景,尤其是在消费者组的动态变化频繁或者要求高可用性的情况下,能够更好地维护分区分配的稳定性。

通过使用 partition.assignment.strategy 参数将分区分配策略设置为 CooperativeSticky,您可以充分利用这个新策略带来的优势,提升 Kafka 消费者组的性能和可靠性。

# 15.offset 的自动提交

为了使我们能够专注于自己的业务逻辑,Kafka 提供了自动提交 offset 的功能。

自动提交 offset 的相关参数:

  • enable.auto.commit: 是否开启自动提交 offset 功能,默认是 true
  • auto.commit.interval.ms:自动提交 offset 的时间间隔,默认是 5s

# 16.offset 的手动提交

对于采用 commitSync () 的无参方法而言,它提交消费位移的频率和拉取批次消息、处理批次消息的频率是一样的,如果想寻求更细粒度的、更精准的提交,那么就需要使用 commitSync () 的另一个含参方法,具体定义如下:

指定 offset 进行提交

public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {}
1

# 17.指定 offset

auto.offset.reset = earliest | latest | none  默认是latest
1
重置偏移量选项 描述
earliest 将偏移量重置为最早的可用偏移量,即从主题的最早消息开始消费。
latest 将偏移量重置为最新的可用偏移量,即只消费新发布的消息。
none 如果没有找到消费者组的先前偏移量,将抛出异常,不允许消费。

当 Kafka 消费者组第一次消费或服务器上不再存在当前偏移量时,您可以根据上述选项之一来选择如何处理偏移量的重置,以确保消费者能够从适当的位置开始消费数据。

seek()方法为我们提供了从特定位置读取消息的能力,我们可以通过这个方法来向前跳过若干消息,也可以通过这个方法来向后回溯若干消息,这样为消息的消费提供了很大的灵活性。seek ()方法也为我们提供了将消费位移保存在外部存储介质中的能力,还可以配合再均衡监听器来提供更加精准的消费能力。

 public void seek(TopicPartition partition, long offset) {}
1

# 18.漏消费和重复消费

  • 重复消费:已经消费了数据,但是 offset 没提交。
  • 漏消费:先提交 offset 后消费,有可能会造成数据的漏消费。

重复消费,自动提交 offset 引起.

漏消费,设置 offset 为手动提交,当 offset 被提交吋,数据还在内存中末落盘,此时刚好消费者线程被 kill 掉,那么 offset 己经提交,但是数据未处理,导致这部分内存中的数据丢失。

image-20221102231037184

# 19.消费者事务

如果想完成 Consumer 端的精准一次性消费,那么需要 Kafka 消费端将消费过程和提交 offset 过程做原子绑定。此时我们需要将 Kafka 的 offset 保存到支持事务的自定义介质(比如 MySQL)。

下游消费者必须支持事务,才能做到精确一次消费.

# 20.如何提高消费者吞吐量?

要提高 Kafka 消费者的吞吐量,您可以考虑以下一些策略和优化措施:

  1. 增加并发消费者:将消费者组内的消费者数量增加到适当的数量,以实现并行处理多个分区的数据。这可以有效地提高整体吞吐量。

  2. 调整分区数量:如果主题的分区数量较少,可以考虑增加分区数,以便将负载均衡地分布给多个消费者,从而提高并行处理的能力。

  3. 合理设置批量拉取大小:通过适当设置 max.poll.records 参数,让消费者一次性拉取多个消息,减少网络通信次数,提高吞吐量。

  4. 增加处理线程数:将消费的消息处理逻辑放入多个线程中,可以并行处理消息,提高消费者的处理能力。

  5. 提高消息处理效率:优化消息处理逻辑,减少不必要的计算和操作,以提高每个消费者的处理效率。

  6. 使用多个分区分配策略:尝试不同的分区分配策略,选择适合您应用的策略,以实现更好的负载均衡和吞吐量。

  7. 合理配置消费者参数:根据实际情况,调整消费者的参数,如 fetch.min.bytesfetch.max.wait.ms 等,以达到更好的性能表现。

  8. 避免长时间处理:在消息处理逻辑中避免长时间的阻塞操作,以免影响消费者的吞吐量。

  9. 使用消费者缓存:可以考虑在消费者中使用缓存,以减少对外部资源的频繁访问,提高数据读取效率。

  10. 监控和调优:定期监控消费者的性能指标,如吞吐量、延迟等,进行适时的调优和优化。

请注意,吞吐量的提升是一个综合性的问题,需要综合考虑消费者的配置、分区情况、消息处理逻辑等多个因素。您可以根据具体情况选择合适的策略进行优化。

# 21.消费者参数

配置参数 描述
fetch.min.bytes 一次拉取的最小数据量。默认为 1 字节。
fetch.max.bytes 一次拉取的最大数据量。默认为 50MB。
fetch.max.wait.ms 最大等待时间,即拉取消息的最长等待时间。默认为 500 毫秒。
max.partition.fetch.bytes 每个分区返回的最大数据量。默认为 1MB。
max.poll.records 每次 poll 操作返回的最大记录数。默认为 500 条。
connections.max.idle.ms 多久之后关闭空闲的连接。默认为 9 分钟。
receive.buffer.bytes 接收缓冲区的大小。默认为 64KB。
send.buffer.bytes 发送缓冲区的大小。默认为 128KB。

这些配置参数可以用于优化 Kafka 消费者的性能和吞吐量,根据您的业务需求和实际情况,可以适当地进行调整。

# 22.消费者协调器

如果消费者客户端中配置了两个分配策略,那么以哪个为准呢?如果有多个消费者,彼此所配置的分配策略并不完全相同,那么以哪个为准?多个消费者之间的分区分配是需要协同的,那么这个协同的过程又是怎样的呢?这一切都是交由消费者协调器(Consumer Coordinator)组协调器(Group Coordinator) 来完成的,它们之间使用一套组协调协议进行交互。

旧版:使用 zookeeper 监听器来完成的.每个消费者在启动时都会在/consumers/group/ids 和/brokers/ids 路径上注册一个监听器。当/consumers/ group/ids 路径下的子节点发生变化时,表示消费组中的消费者发生了变化;当/brokers/ ids 路径下的子节点发生变化时,表示 broker 出现了增减。这样通过 ZooKeeper 所提供的 Watcher,每个消费者就可以监听消费组和 Kafka 集群的状态了。

这种方式下每个消费者对 ZooKeeper 的相关路径分别进行监听,当触发再均衡操作时,一个消费组下的所有消费者会同时进行再均衡操作,而消费者之间并不知道彼此操作的结果,这样可能导致 Kafka 工作在一个不正确的状态。与此同时,这种严重依赖于 ZooKeeper 集群的做法还有两个比较严重的问题。

  • 羊群效应(Herd Effect) :所谓的羊群效应是指 ZooKeeper 中一个被监听的节点变化,大量的 Watcher 通知被发送到客户端,导致在通知期间的其他操作延迟,也有可能发生类似死锁的情况。
  • 脑裂问题(Split Brain) :消费者进行再均衡操作时每个消费者都与 ZooKeeper 进行通信以判断消费者或 broker 变化的情况,由于 ZooKeeper 本身的特性,可能导致在同一时刻各个消费者获取的状态不一致,这样会导致异常问题发生。

新版:新版的消费者客户端对此进行了重新设计,将全部消费组分成多个子集,每个消费组的子集在服务端对应一个 GroupCoordinator 对其进行管理,GroupCoordinator 是 Kafka 服务端中用于管理消费组的组件。而消费者客户端中的 ConsumerCoordinator 组件负责与 GroupCoordinator 进行交互。ConsumerCoordinatorGroupCoordinator 之间最重要的职责就是负责执行消费者再均衡的操作,包括前面提及的分区分配的工作也是在再均衡期间完成的。就目前而言

# 23._consumer_offsets 剖析

_consumer_offsets 是 kafka 默认创建的,一共 50 个分区.

一般情况下,当集群中第一次有消费者消费消息时会自动创建_consumer_offsets,它的副本因子受 offsets.topic.replication.factor 参数的约束,默认值为 3,分区数可以通过 offsets.topic.num.partitions 参数设置,默认值为 50。

客户端提交消费位移是使用 OffsetCommitRequest 请求实现的,OffsetCommitRequest 的结构如图所示。

retention_time 表示当前提交的消费位移所能保留的时长,不过对于消费者而言这个值保持为-1。也就是说,按照 broker 端的配置 offsets.retention.minutes 来确定保留时长。offsets.retention.minutes 的默认值为 10080,即 7 天,超过这个时间的消息会被删除.

image-20221109174605738

同消费组的元数据信息一样,最终提交的消费位移也会以消息的形式发送至主题 consumer_offsets, 与消费位移对应的消息也只定义了 keyvalue 字段的具体内容,它不依赖于具体版本的消息格式,以此做到与具体的消息格式无关。

image-20221109174925725

key 中除了 version 字段还有 group、topic、partition 字段,分别表示消费组的 groupld、主题名称和分区编号。虽然 key 中包含了 4 个字段,但最终确定这条消息所要存储的分区还是根据单独的 group 字段来计算的,这样就可以保证消费位移信息与消费组对应的 GroupCoordinator 处于同一个 broker 节点上,省去了中间轮转的开销,这一点与消费组的元数据信息的存储是一样的。

value 中包含了 5 个字段,除 version 字段外,其余的 offset、metadata、commit_ timestamp、 expire timestamp 字段分别表示消费位移、自定义的元数据信息、位移提交到 Kafka 的时间戳、消费位移被判定为超时的时间戳。其中 offset 和 metadata 与 OffsetCommitRequest 请求体中的 offset 和 metadata 对应,而 expire .timestamp 和 OffsetCommitRequest 请求体中的 retention* time 也有关联,commit* timestamp 值与 offsets.retention.minutes 参数值之和即为 expire_ _timestamp (默认情况下)。

在处理完消费位移之后,Kafka 返回 OffsetCommitResponse 给客户端,OffsetCommitResponse 的结构如图所示。

image-20221109175146685

冷门知识:如果有若干消费者消费了某个主题中的消息,并且也提交了相应的消费位移,那么在删除这个主题之后会一并将这些消费位移信息删除

# 24.消费者的部署模式

Apache Kafka 的消费者(Consumers)可以以多种方式进行部署,以满足不同的可扩展性和容错性需求。以下是一些常见的消费者部署模式:

单一消费者(Single Consumer):这是最简单的部署模式,其中只有一个消费者从一个或多个 Kafka 主题(Topics)中读取数据。这种模式适用于低吞吐量和不需要高可用性的应用场景。

消费者组(Consumer Groups):在消费者组中,多个消费者可以协同工作,共同消费一个或多个主题的数据。Kafka 会自动地将每个主题的分区(Partitions)分配给消费者组中的各个消费者,以实现负载均衡。如果一个消费者失败,Kafka 会重新分配分区给其他可用的消费者。

竞争消费者(Competing Consumers):在这种模式下,多个独立的消费者或消费者组竞争同一个主题的数据。这意味着同一个消息可能会被多个消费者处理,通常用于实现任务分派。

多实例消费者(Multi-Instance Consumers)在这种模式下,一个单一的消费逻辑可以部署为多个实例,每个实例都有自己的消费者组 ID。这样,每个实例都能从主题中读取所有消息,实现多重处理或多重存储。

无组消费者(Simple Consumers without Groups)在某些情况下,你可能希望消费者直接控制读取哪个分区,而不是使用 Kafka 的消费者组来自动进行分区分配。这通常需要更多的编程工作,但提供了更大的控制灵活性。

镜像消费者(Mirror Consumers)用于跨集群复制数据。例如,可以使用 Kafka MirrorMaker 工具将数据从一个 Kafka 集群复制到另一个集群。

全局消费者(Global Consumers)某些应用场景需要从所有分区读取所有消息,而不仅仅是某个分区的一部分。全局消费者通常用于构建全局状态或进行复杂的事件处理。

# 四.broker 服务

# 1.基础架构

一个典型的 Kafka 体系架构包括若干 Producer、若干 Broker、若干 Consumer ,以及一个 ZooKeeper 集群。其中 ZooKeeper 是 Kafka 用来负责集群元数据的管理、控制器的选举等操作的。Producer 将消息发送到 Broker, Broker 负责将收到的消息存储到磁盘中,而 Consumer 负责从 Broker 订阅并消费消息。

Server.config 需要配置 3 个参数

  • brokerid:唯一
  • 日志:地址
  • zk:地址

image-20231107000254229

# 2.broker 的工作流程

Broker: 服务代理节点。对于 Kafka 而言,Broker 可以简单地看作一个独立的 Kafka 服务节点或 Kafka 服务实例。大多数情况下也可以将 Broker 看作一台 Kafka 服务器,前提是这台服务器上只部署了一个 Kafka 实例。一个或多个 Broker 组成了一个 Kafka 集群。一般而言,我们更习惯使用首字母小写的 broker 来表示服务代理节点。

image-20221024113734400

# 3.存储机制

Kafka 的 broker 存储机制是其核心设计之一,它采用了一种持久化、高性能、分布式的存储方式,以支持高吞吐量和持久化的消息传递。Kafka 通过日志(Log)的方式来存储消息,以下是有关 Kafka broker 存储机制的重要概念:

  1. 日志(Log):Kafka 的存储基本单元是日志,每个主题(Topic)都有一个或多个分区(Partition),每个分区就是一个日志。消息按照顺序追加到日志的末尾,因此消息的顺序得以保持。

  2. 分区(Partition):分区是主题的逻辑划分,用于实现数据水平扩展和负载均衡。每个分区都有一个唯一的编号,分区内的消息是有序的。消费者可以并行地从不同的分区读取消息。

  3. 段(Segment):日志被划分为多个段,每个段对应一个物理文件,用于存储消息。当一个段达到一定的大小限制时,会被关闭并创建新的段。这种方式有助于提高读写性能和管理。

  4. 索引(Index):每个段都有一个索引文件,用于存储消息的索引信息。索引使得 Kafka 可以快速地定位消息的位置,从而提高消息的读取效率。

  5. 快照(Snapshot):Kafka 会定期对日志进行快照,将段中的消息写入快照文件,以实现数据的压缩和删除过期数据。这有助于减少磁盘占用和提高性能。

  6. 日志段清理(Log Compaction):除了定期快照,Kafka 还支持日志段清理,即按照键值对的方式保留最新的消息,从而删除相同键的旧消息。这有助于保留关键数据,例如最新的状态变更。

总体来说,Kafka 的存储机制以日志为基础,通过分区、段、索引、快照和日志段清理等机制来实现高效的消息存储和管理。这种设计使得 Kafka 能够处理大规模的消息流,同时保证数据的可靠性和持久性。

# 4.zk 中存储了哪些信息?

节点 内容 说明
/kafka/brokers/ids [0,1,2] 记录有哪些服务器
/kafka/brokers/topics/first/partitions/0/state {“leader”:1, “isr”:[1,0,2]} 记录谁是 Leader,有哪些服务器可用
/kafka/controller {"brokerid":0} 辅助选举 Leader

image-20231107000321919

  • admin: 存储管理员接口操作的相关信息,主要为 topic 删除事件,分区迁移事件,优先副本选举,信息 (一般为临时节点)

  • brokers: 主要存储 broker 相关的信息,broker 节点以及节点上的 topic 相关信息

  • cluster: 存储 kafka 集群信息

  • config: 存储 broker,client,topic,user 以及 changer 相关的配置信息

  • consumers: 存放消费者相关信息 (一般为空)

  • controller: 用于存放控制节点信息 (注意:该节点是一个临时节点,用于 controller 节点注册)

  • controller_epoch: 用于存放 controller 节点当前的年龄

  • isr_change_notification: 用于存储 isr 的变更通知 (临时节点,当有 isr 进行变动时,会用于事件通知,可进行 watch 获取集群 isr 状态变更)

  • latest_producer_id_block: 该节点用于存储处理事务相关的 pid 范围

  • log_dir_event_notification: 日志目录事件通知

# 5.offset 存储位置?

  • 0.9 版本之前保存在 zookeeper 下的 consumers
  • 0.9 版本之后 offset 存储在 kafka 主题中,该 topic 为_consumer_offsets,不在 zookeeper

consumer_offsets 是 kafka 自行创建的,和普通的 topic 相同。它存在的目的之一就是保存 consumer 提交的位移。

consumer_offsets 的每条消息格式大致如图所示:

image-20221102180305338

可以想象成一个 KV 格式的消息,key 就是一个三元组:group.id+topic+分区号,而 value 就是 offset 的值。

考虑到一个 kafka 生成环境中可能有很多 consumerconsumer group,如果这些 consumer 同时提交位移,则必将加重 **consumer_offsets 的写入负载,因此 kafka 默认为该 topic 创建了 50 个分区,并且对每个 group.id 做哈希求模运算,从而将负载分散到不同的 **consumer_offsets 分区上。

一般情况下,当集群中第一次有消费者消费消息时会自动创建 _consumer_offsets,它的副本因子受 offsets.topic.replication.factor 参数的约束,默认值为 3,分区数可以通过 offsets.topic.num.partitions 参数设置,默认值为 50。

# 6.Leader 选举的过程

kafka 中的控制器选举工作依赖于 Zookeeper,成功竞选成为控制器的 broker 会在 Zookeeper 中创建/controller 临时节点。

每个 broker 启动的时候会去尝试读取/controller 节点的 brokerid 的值,如果读取到的 brokerid 的值不为-1,表示已经有其他 broker 节点成功竞选为控制器,所以当前 broker 就会放弃竞选;

如果 Zookeeper 中不存在/controller节点,或者这个节点的数据异常,那么就会尝试去创建/controller 节点,创建成功的那个 broker 就会成为控制器。

每个 broker 都会在内存中保存当前控制器的 brokerid 值,这个值可以标识为 activeControllerId

Zookeeper 中还有一个与控制器有关的/controller_epoch 节点,这个节点是持久节点,节点中存放的是一个整型的 controller_epoch 值。controller_epoch 值用于记录控制器发生变更的次数。

controller_epoch 的初始值为 1,即集群中的第一个控制器的纪元为 1,当控制器发生变更时,每选出一个新的控制器就将该字段值加 1。

每个和控制器交互的请求都会携带 controller_epoch 这个字段,

  • 如果请求的 controller_epoch 值小于内存中的 controller_epoch 值,则认为这个请求是向已经过期的控制器发送的请求,那么这个请求会被认定为无效的请求。

  • 如果请求的 controller_epoch 值大于内存中的 controller_epoch 值,那么说明已经有新的控制器当选了

image-20221026230811688

image-20221026231058433

image-20221026231210511

选取原则:先看 isr 中有没有,如果没有,直接 pass,如果有,继续看副本中的顺序,以副本中的顺序进行选取.

# 7.什么是 LEO 和 HW?

缩写 全称 描述
LEO Log End Offset 每个副本的最后一个 offset,实际上是最新的 offset + 1。
HW High Watermark 所有副本中最小的 LEO,消费者只能消费 HW 之前的消息。HW/LEO 都指最后一条消息的下一条位置。
LSO Last Stable Offset 对于未完成的事务,LSO 等于事务中第一条消息的位置(firstUnstableOffset)。对已完成的事务,值同 HW。
LW Low Watermark 低水位,代表 AR 集合中最小的 logStartOffset 值。

对于副本而言,还有两个概念:本地副本(Local Replica)远程副本(Remote Replica) ,本地副本是指对应的 Log 分配在当前的 broker 节点上,远程副本是指对应的 Log 分配在其他的 broker 节点上。在 Kafka 中,同一个分区的信息会存在多个 broker 节点上,并被其上的副本管理器所管理,这样在逻辑层面每个 broker 节点上的分区就有了多个副本,但是只有本地副本才有对应的日志。

如图所示,某个分区有 3 个副本分别位于 broker0、broker1 和 broker2 节点中,其中带阴影的方框表示本地副本。假设 broker0 上的副本 1 为当前分区的 Leader 副本,那么副本 2 和副本 3 就是 Follower 副本,整个消息追加的过程可以概括如下:

  1. 生产者客户端发送消息至 Leader 副本(副本 1) 中。
  2. 消息被追加到 Leader 副本的本地日志,并且会更新日志的偏移量。
  3. Follower 副本(副本 2 和副本 3) 向 Leader 副本 请求同步数据。
  4. Leader 副本所在的服务器读取本地日志,并更新对应拉取的 Follower 副本的信息。
  5. Leader 副本所在的服务器将拉取结果返回给 Follower 副本。
  6. Follower 副本收到 Leader 副本 返回的拉取结果,将消息追加到本地日志中,并更新日志的偏移量信息。

image-20221109180535325

# 8.LEO 和 HW 更新逻辑?

  1. Leader 中写入消息,LEO 为 5,HW 为 0
  2. Follower 请求 Leader,带上自己的 LEO 值
  3. Leader 更新自己的 HW 值 min(自己的 HW,副本的 LEO)
  4. Leader 发送消息给 Follower,带上 HW 值
  5. Follower 接收消息,LEO 接收的不同,但是 HW 和 Leader 保持一致
  6. 重复上述操作

image-20221109180757815

在一个分区中,Leader 副本所在 的节点会记录所有副本的 LEO,而 Follower 副本所在的节点只会记录自身的 LEO,而不会记录其他副本的 LEO。对 HW 而言,各个副本所在的节点都只记录它自身的 HWLeader 副本中带有其他 Follower 副本的 LEO,那么它们是什么时候更新的呢?

Leader 副本收到 Follower 副本的 FetchRequest 请求之后,它首先会从自己的日志文件中读取数据,然后在返回给Follower 副本数据前先更新 Follower 副本的 LEO

# 9.数据丢失

在 Kafka 中的两种情况,其中副本(replica)的高水位线(High Watermark,HW)的变化会影响数据的持久性和恢复。

场景 描述
a 是 Follower,b 是 Leader,a 的 HW=1,b 的 HW=2 时,a 宕机 在这种情况下,如果 a 宕机,a 上的数据 m2 将丢失,因为 a 的 HW 达到 1,而数据 m2 的 HW 为 2。
b 宕机,a 变为 Leader,b 恢复为 Follower,HW 被截断为 HW=1 如果 b 宕机,a 变为 Leader,b 恢复为 Follower,并且 HW 被截断为 HW=1,数据 m2 会丢失。

这些情况说明了 Kafka 中副本的 HW 的重要性,它决定了消费者可以读取的消息范围,同时也影响了数据的持久性和恢复。

image-20221109224810854

# 10.数据不一致

  • a 为 Leader,b 为 Follower,a 的 HW 和 LEO 都为 2,b 的 HW 和 LEO 都为 1,此时,a 和 b 同时挂掉,b 先恢复,且写入了一条数据 m3,更新 HW 和 LEO 的值为 2
  • b 恢复,变为 Follower,此时 a 和 b 的 HW 都为 2,不做任何调整,但是数据不一致,一个是 m2 一个是 m3
  • 主要出现在 Leader 和 Follower 同时挂掉,但是 Follower 先恢复,且插入了一条数据,HW 相同,但是数据不一致

image-20221109225230267

# 11.Leader epoch

为了解决上述两种问题,Kafka 从 0.11.0.0 开始引入了Leader epoch的概念,在需要截断数据的时候使用 Leader epoch 作为参考依据而不是原本的 HW。Leader epoch 代表 Leader 的纪元信息(epoch) ,初始值为 0。每当 Leader 变更一次,Leader epoch 的值就会加 1, 相当于为 Leader 增设了一个版本号。与此同时,每个副本中还会增设一个矢量 LeaderEpoch=StartOffset,其中 StartOffset 表示当前 LeaderEpoch 下写入的第一条消息的偏移量。每个副本的 Log 下都有一个 Leader-epoch-checkpoint 文件,在发生 Leader epoch 变更时,会将对应的矢量对追加到这个文件中。

  • 数据丢失:请求 le 的值,如果相同,不做任何处理,如果不同,则返回 Leader 的 LEO 值,和 Follower 的 LEO 比较,如果相同则不做处理
  • 数据不一致:如果 le 的值变了,则截断数据,保持 LEO 和 HW 同步

image-20221109225848204

# 12.Follower 故障处理

  1. Follower 发生故障后会被临时踢出 ISR
  2. 这个期间 Leader 和 Follower 继续接收数据
  3. 待该 Follower 恢复后,Follower 会读取本地磁盘记录的上次的 HW,并将 log 文 件高于 HW 的部分截取掉,从 HW 开始向 Leader 进行同步。

image-20221026234223820

# 13.Leader 故障处理

  1. Leader 发生故障之后,会从 ISR 中选出一个新的 Leader
  2. 为保证多个副本之间的数据一致性, 其余的 Follower 会先将各自的 log 文件高于 HW 的部分截掉,然后从新的 Leader 同步数据。

注意:这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复

image-20221026234419350

# 14.Leader partition 自平衡

正常情况下,Kafka本身会自动把 Leader Partition 均匀分散在各个机器上,来保证每台机器的读写吞吐量都是均匀的。但是如果某些 broker 宕机,会导致 Leader Partition 过于集中在其他少部分几台 broker 上,这会导致少数几台 broker 的读写请求压力过高,其他宕机的 broker 重启之后都是 Follower partition,读写请求很低,造成集群负载不均衡。

  • auto.leader.rebalance.enable,默认是 true。自动 Leader Partition 平衡
  • leader.imbalance.per.broker.percentage,默认是 10%。每个 broker 允许的不平衡的 Leader 的比率。如果每个 broker 超过了这个值,控制器会触发 Leader 的平衡。
  • leader.imbalance.check.interval.seconds,默认值 300 秒。检查 Leader 负载是否平衡的间隔时间。
配置项 默认值 描述
auto.leader.rebalance.enable true 启用自动 Leader Partition 平衡
leader.imbalance.per.broker.percentage 10% 每个 broker 允许的不平衡的 Leader 的比率
leader.imbalance.check.interval.seconds 300 秒 检查 Leader 负载是否平衡的间隔时间

假设集群只有一个主题如下图所示,有四个分区,Leader 分别是 0,1,2,3,对应的副本信息如下图所示.

image-20221026235546463

针对 broker 0 节点,分区 2 的 AR 优先副本是 0 节点,但是 0 节点却不是 Ieader 节点,所以不平衡数加 1,AR 副本总数是 4,所以 broker 0 节点不平衡率为 1/4>10%,需要再平衡。

# 15.复制限流

分区重分配本质在于数据复制,先增加新的副本,然后进行数据同步,最后删除旧的副本来达到最终的目的。

数据复制会占用额外的资源,如果重分配的量太大必然会严重影响整体的性能,尤其是处于业务高峰期的时候。减小重分配的粒度,以小批次的方式来操作是一种可行的解决思路。

如果集群中某个主题或某个分区的流量在某段时间内特别大,那么只靠减小粒度是不足以应对的,这时就需要有一个限流的机制,可以对副本间的复制流量加以限制来保证重分配期间整体服务不会受太大的影响。

# 16.分区数的上下限

下限:性能测试工具是 Kafka 本身提供的用于生产者性能测试的 kafka-producer-perf-test.sh 和用于消费者性能测试的 kafka-consumer-perf-test.sh

测试指标 解释
records sent 测试时发送的消息总数
records/sec 以每秒发送的消息数来统计吞吐量
MB/sec 以每秒发送的消息大小来统计吞吐量
avg latency 消息处理的平均耗时
max latency 消息处理的最大耗时
50th 50% 的消息处理耗时
95th 95% 的消息处理耗时
99th 99% 的消息处理耗时
99.9th 99.9% 的消息处理耗时

随着分区数的增加,相应的吞吐量也会有所增长。一旦分区数超过了某个阈值之后,整体的吞吐量也是不升反降的,说明了分区数越多并不会使吞吐量一直增长。

一味地增加分区数并不能使吞吐量一直得到提升,并且分区数也并不能一直增加,如果超过默认的配置值,还会引起 Kafka 进程的崩溃。

# 17.时间轮

Kafka 中存在大量的延时操作,比如延时生产延时拉取延时删除等。Kafka 并没有使用 JDK 自带的 Timer 或 DelayQueue 来实现延时的功能,而是基于时间轮的概念自定义实现了一个用于延时功能的定时器(SystemTimer) 。JDK 中 Timer 和 DelayQueue 的插入和删除操作的平均时间复杂度为 O (nlogn) 并不能满足 Kafka 的高性能要求,而基于时间轮可以将插入和删除操作的时间复杂度都降为 O(1) 。时间轮的应用并非 Kafka 独有,其应用场景还有很多,在 Netty、QuartzZooKeeper 等组件中都存在时间轮的踪影。

Kafka 中的时间轮(TimingWheel) 是一个存储定时任务的环形队列,底层采用数组实现,数组中的每个元素可以存放一个定时任务列表(TimerTaskList) 。TimerTaskList 是一个环形的双向链表,链表中的每一项表示的都是定时任务项(TimerTaskEntry),其中封装了真正的定时任务(TimerTask)。

时间轮由多个时间格组成,每个时间格代表当前时间轮的基本时间跨度(tickMs) 。时间轮的时间格个数是固定的,可用 wheelSize 来表示,那么整个时间轮的总体时间跨度(interval) 可以通过公式 tickMsxwheelSize 计算得出。时间轮还有一个表盘指针(currentTime),用来表示时间轮当前所处的时间,currentTime 是 tickMs 的整数倍.currentTime 可以将整个时间轮划分为到期部分和未到期部分,currentTime 当 前指向的时间格也属于到期部分,表示刚好到期,需要处理此时间格所对应的 TimerTaskL ist 中的所有任务。

# 18.为什么不支持读写分离?

  • 数据一致性问题。数据从主节点转到从节点必然会有一个延时的时间窗口,这个时间窗口会导致主从节点之间的数据不一致。某一时刻,在主节点和从节点中 A 数据的值都为 X,之后将主节点中 A 的值修改为 Y,那么在这个变更通知到从节点之前,应用读取从节点中的 A 数据的值并不为最新的 Y,由此便产生了数据不一致的问题。
  • 延时问题。类似 Redis 这种组件,数据从写入主节点到同步至从节点中的过程需要经历网络 → 主节点内存 → 网络 → 从节点内存这几个阶段,整个过程会耗费一定的时间。而在 Kafka 中,主从同步会比 Redis 更加耗时,它需要经历网络 → 主节点内存 → 主节点磁盘 → 网络 → 从节点内存 → 从节点磁盘这几个阶段。对延时敏感的应用而言,主写从读的功能并不太适用。

# 19.kafka 中的选举?

因为 kafka 中涉及多处选举机制,容易搞混,kafka 由三个方面会涉及到选举:

  • broker(控制器)选 Leader

  • 分区多副本选 Leader

  • 消费者选 Leader

在 kafka 集群中由很多的 broker(也叫做控制器),但是他们之间需要选举出一个 Leader,其他的都是 FollowerbrokerLeader 有很重要的作用,诸如:创建、删除主题、增加分区并分配 Leader 分区;集群 broker 管理,包括新增、关闭和故障处理;分区重分配(auto.leader.rebalance.enable=true,后面会介绍),分区 Leader 选举。

每个 broker 都有唯一的 brokerId,他们在启动后会去竞争注册 zookeeper 上的 Controller 结点,谁先抢到,谁就是 broker leader。而其他 broker 会监听该结点事件,以便后续 leader 下线后触发重新选举。

# 20.kafka 副本作用?

在 Apache Kafka 中,副本(Replicas)是一种关键的概念,它们在数据持久性、可靠性和容错性方面起着重要作用。Kafka 是一种分布式流数据平台,用于处理和传输实时数据流,副本是实现其高可用性和可靠性的核心机制之一。以下是副本在 Kafka 中的作用:

  1. 数据冗余和可靠性: 副本机制允许 Kafka 将同一主题(Topic)的数据副本保存在不同的 Kafka 节点上。每个主题的分区(Partition)可以有多个副本,这些副本分布在不同的 Kafka 节点上,以确保数据的冗余存储。如果某个节点发生故障,其他副本仍然可以提供数据,从而保证数据的可靠性。

  2. 故障容错: 当一个或多个节点发生故障时,Kafka 可以使用副本中的数据继续提供服务。如果一个节点不可用,其他节点上的副本仍然可以被用来处理数据请求,确保系统的可用性和持续性。

  3. 负载均衡: Kafka 允许将副本分布在不同的节点上,这有助于分摊数据读写的负载。当有多个副本时,读取请求可以从多个副本中选择,从而分散读取压力。

  4. 数据恢复: 副本允许 Kafka 在发生数据丢失或损坏的情况下进行数据恢复。如果某个副本上的数据损坏,可以从其他副本中复制数据进行恢复。

  5. 扩展性: 副本机制为 Kafka 的水平扩展提供了基础。通过增加副本和节点,可以提高 Kafka 集群的吞吐量和容量。

副本在 Kafka 中起到了确保数据可靠性、容错性和高可用性的关键作用。它们是构建可靠数据流架构的基础组成部分,帮助应对故障和数据丢失等挑战。

# 五.kafka 文件

# 1.消息格式

下图左边的 RECORD 部分就是消息格式,每个 RECORD 必定对应一个 offsetmessage sizeoffset 用来标志它在 Partition 中的偏移量 ,这个 offset 是逻辑值,而非实际物理偏移值,message size 表示消息的大小,这两者在一起被称为日志头部 LOG_OVERHEAD ,固定为 12B 。LOG_OVERHEADRECORD 一起用来描述一条消息。

与消息对应的还有消息集的概念(Message Set下图中的右边部分),消息集中包含一条或多条消息,消息集不仅是存储于磁盘及在网络上传输(Produce & Fetch)的基本形式,而且是 Kafka 中压缩的基本单元。

  • crc32 校验值(4 字节):使用 CRC32 校验算法计算得出的校验值,用于校验消息在指定范围内是否完整和正确。校验的范围从 magic 值开始,直到 value 值结束。
  • magic 值(1 字节):表示消息的格式版本号。在这个消息格式中,magic 值为 0,表示使用 v0 版本的消息格式。
  • attributes(1 字节):用于描述消息的属性。低 3 位表示压缩类型,其余位保留。各个压缩类型的值如下:
    • 0 表示 NONE,即未压缩。
    • 1 表示 GZIP 压缩。
    • 2 表示 SNAPPY 压缩。
    • 3 表示 LZ4 压缩(从 Kafka 0.9.x 版本引入)。
  • key(可选):消息的键,用于标识消息。如果没有键,则无此字段。
  • key length(4 字节):表示消息键的长度。如果键为 null,则此值为 -1。
  • value(消息体):消息的主体内容。可以为空,例如用于表示墓碑(tombstone)消息。
  • value length(4 字节):表示消息主体内容的长度。如果主体内容为空,则此值为 -1。

image-20221110172805437

# 2.kafka 文件存储机制

主题分区是 Kafka 的两个核心概念。主题作为消息的归类,可以再细分为一个或多个分区,分区也可以看作对消息的二次归类。分区的划分不仅为 Kafka 提供了可伸缩性、水平扩展的功能,还通过多副本机制来为 Kafka 提供数据冗余提高数据可靠性

从 Kafka 的底层实现来说,主题分区都是逻辑上的概念,分区可以有一至多个副本,每个副本对应一个日志文件,每个日志文件对应一至多个日志分段(LogSegment) ,每个日志分段还可以细分为索引文件日志存储文件快照文件等。不过对于使用 Kafka 进行消息收发的普通用户而言,了解到分区这一层面足以应对大部分的使用场景。

Topic 是逻辑上的概念,而 partition 是物理上的概念,每个分区对应一个日志文件,,该 log 文件中存储的就是 Producer 生产的数据。Producer 生产的数据会被不断追加到该 log 文件末端,为防止 log 文件过大导致数据定位效率低下,Kafka 采取了分片和索引机制,将每个 partition 分为多个 segment

每个 segment 包括:index文件.log文件.timeindex 等文件。这些文件位于一个文件夹下,该文件夹的命名规则为:topic 名称+分区序号,例如:first-0。

  • .log 日志文件
  • .index 偏移量索引文件
  • .timeindex 时间戳索引文件
  • 其他文件

说明:index 和 log 文件以当前 segment 的第一条消息的 offset 命名。

image-20221026235959819

# 3.日志索引

每个日志分段文件对应了两个索引文件,主要用来提高查找消息的效率。偏移量索引文件用来建立消息偏移量 (offset)到物理地址之间的映射关系,方便快速定位消息所在的物理文件位置;时间戳索引文件则根据指定的时间戳(timestamp)来查找对应的偏移量信息。

Kafka 中的索引文件以稀疏索引(sparse index) 的方式构造消息的索引,它并不保证每个消息在索引文件中都有对应的索引项。每当写入一定量(由 broker 端参数 log.index.interval.bytes 指定,默认值为 4096,即 4KB) 的消息时,偏移量索引文件和时间戳索引文件分别增加一个偏移量索引项和时间戳索引项,增大或减小 log.index.interval.bytes 的值,对应地可以增加或缩小索引项的密度。

稀疏索引通过 MappedByteBuffer 将索引文件映射到内存中,以加快索引的查询速度。

为了便于消息的检索,每个 LogSegment 中的日志文件(以“.log”为文件后缀)都有对应的两个索引文件:偏移量索引文件(以“.index”为文件后缀)和时间戳索引文件(以“.timeindex”为文件后缀)。每个 LogSegment 都有一个基准偏移量 baseOffset,用来表示当前 LogSegment 中第一条消息的 offset。偏移量是一个 64 位的长整型数,日志文件和两个索引文件都是根据基准偏移量(baseOffset)命名的,名称固定为 20 位数字,没有达到的位数则用 0 填充。比如第一个 LogSegment 的基准偏移量为 0,对应的日志文件为 00000000000000000000.log。

# 4.稀疏索引

当谈到 Kafka 的稀疏索引时,我们实际上是在讨论 Kafka 日志段(Log Segment)的索引结构。Kafka 通过维护这些稀疏索引来提高消息的检索效率。下面详细介绍稀疏索引在 Kafka 中的工作原理和作用:

  1. Log Segment(日志段):Kafka 中的消息被分成不同的日志段,每个日志段保存一定数量的消息。每个日志段都有一个基础偏移量(Base Offset),表示该日志段中第一条消息的偏移量。

  2. Index Segment(索引段):为了加速消息的检索,Kafka 为每个日志段维护一个索引段。索引段由一系列索引条目组成,每个条目对应一定范围的消息。这些索引条目保存了一些关键信息,如消息的起始偏移量、消息的物理位置等。

  3. Sparse Indexing(稀疏索引):Kafka 的稀疏索引并不会为每条消息都创建一个索引条目,而是以一定的间隔创建索引条目。这个间隔通常是根据消息的数量和大小来动态调整的。这种设计可以有效减小索引的存储开销,并且仍然能够在检索时快速定位到消息。

  4. 索引条目结构:每个索引条目通常包括以下信息:

    • 起始偏移量:表示此索引条目所覆盖的消息范围的起始偏移量。
    • 物理位置:表示消息在日志段文件中的实际位置,这可以帮助快速定位并读取消息。
  5. Compressed Index(压缩索引):为了减小索引的存储开销,Kafka 还采用了一些压缩技术。索引数据在内存中被压缩,以便更多的索引条目可以保存在有限的内存空间中。

  6. 查询过程:当消费者需要检索消息时,它可以使用稀疏索引来快速定位到所需消息的日志段,并进一步读取这些日志段以获取消息内容。这种方式避免了在整个日志中线性搜索消息,从而提高了检索效率。

通过使用稀疏索引,Kafka 能够在高吞吐量和大数据量的场景中高效地管理消息存储和检索。稀疏索引的设计减小了索引的存储开销,同时仍然保持了良好的检索性能,使得 Kafka 能够满足实时流处理和消息传递的需求。

# 5.log 文件和 index 文件

  1. 根据目标 offset 定位 Segment 文件
  2. 找到小于等于目标 offset 的最大 offset 对应的索引项
  3. 定位到 log 文件
  4. 向下遍历找到目标 Record

比如:要查找绝对 offset 为 7 的 Message

  1. 用二分查找确定它是在哪个 LogSegment 中,自然是在第一个 Segment 中。
  2. 打开这个 Segment 的 index 文件,也是用二分查找找到 offset 小于或者等于指定 offset 的索引条目中最大的那个 offset。自然 offset 为 6 的那个索引是我们要找的,通过索引文件我们知道 offset 为 6 的 Message 在数据文件中的位置为 9807。
  3. 打开数据文件,从位置为 9807 的那个地方开始顺序扫描直到找到 offset 为 7 的那条 Message。

image-20230811105956701

注意

  1. index 为稀疏索引,大约每往 log 文件写入4kb 数据,会往 index 文件写入一条索引。参数 log.index.interval.bytes 默认 4kb。
  2. index 文件中保存的 offset 为相对 offset, 这样能确保 offset 的值所占空间不会过大,因此能将 offset 的值控制在固定大小

# 6.文件清理策略

Kafka 中默认的日志保存时间为 7天,可以通过调整如下参数修改保存时间。

配置项 描述
log.retention.hours 日志保留时间,最低优先级,单位:小时
log.retention.minutes 日志保留时间,单位:分钟
log.retention.ms 日志保留时间,最高优先级,单位:毫秒
log.retention.check.interval.ms 检查周期,单位:毫秒,默认值:5 分钟

那么日志一旦超过了设置的时间,怎么处理呢? Kalka 中提供的日志清理策略有 delete 和 compact 两种。

# 7.日志删除策略

delete 日志删除:将过期数据删除

  • log.cleanup.policy =delete 所有数据启用删除策略
    1. 基于时间:默认打开。以 segment 中所有记录中的最大时间截作为该文件时间戳。
    2. 基于大小:默认关闭。超过设置的所有日志总大小,删除最早的 segment。
  • Iog.retention.bytes,默认等于-1,表示无穷大。

思考:如果一个 segment 中有一部分数据过期,一部分没有过期,怎么处理?

只有最大的过期才能删除

kafka 提供了一个墓碑消息(tombstone)的概念。如果一条消息的 key 不为 null,但是其 value 为 null,那么此消息就是墓碑消息。日志清理线程发现墓碑消息时会先进行常规的清理,并保留墓碑消息一段时间。

# 8.日志整理策略

LogCompact 日志整理

compact 日志压缩:对于相同 key 的不同 value 值,只保留最后一个版本。类似 redis 中的 aof 重写

  • log.cleanup.policy= compact 所有数据启用压缩策略

image-20221027001947559

# 9.高效读写数据

  1. Kafka 本身是分布式集群,可以采用分区技术,并行度高
  2. 读数据采用稀疏索引,可以快速定位要消费的数据
  3. 顺序写磁盘,Kafka 的 producer 生产数据,要写入到 log 文件中,写的过程是一直追加到文件末端,为顺序写。官网有数据表明,同样的磁盘,顺序写能到 600M/s,而随机写只有 100K/s。这与磁密的机械机构有关,顺序写之所以快,是因为其省去了大量磁头寻址的时间。

# 10.页缓存和零拷贝

零拷贝:Kafka 的数据加工处理操作交由 Kafka 生产者和 Katka 消费者处理。Kafka Broker 应用层不关心存储的数据,所以就不用走应用层,传输效率高。

PageCache 页缓存:Katka 重度依赖底层操作系统提供的 PageCache 功能。当上层有写操作时,操作系统只是将数据写入 PageCache。 当读操作发生时,先从 PageCache 中查找,如果找不到,再去磁盘中读取。实际上 PageCache 是把尽可能多的空闲内存都当做了磁盘缓存来使用。

页缓存是操作系统实现的一种主要的磁盘缓存 ,以此用来减少对磁盘 I/O 操作,就是把磁盘中的数据缓存到内存中,把对磁盘的访问变为对内存的访问。

当一个进程准备读取磁盘上的文件内容时,操作系统会先查看待读取的数据所在的页 (page)是否在页缓存(pagecache)中,如果存在(命中) 则直接返回数据,从而避免了对物理磁盘的 I/O 操作;如果没有命中,则操作系统会向磁盘发起读取请求并将读取的数据页存入页缓存,之后再将数据返回给进程。同样,如果一个进程需要将数据写入磁盘,那么操作系统也会检测数据对应的页是否在页缓存中,如果不存在,则会先在页缓存中添加相应的页,最后将数据写入对应的页。被修改过后的页也就变成了脏页,操作系统会在合适的时间把脏页中的数据写入磁盘,以保持数据的一致性 。

对一个进程而言,它会在进程内部缓存处理所需的数据,然而这些数据有可能还缓存在操作系统的页缓存中,因此同一份数据有可能被缓存了两次。并且除非使用 Direct I/O 的方式, 否则页缓存很难被禁止。此外,用过 Java 的人一般都知道两点事实:对象的内存开销非常大,通常会是真实数据大小的几倍甚至更多,空间使用率低下;Java 的垃圾回收会随着堆内数据的增多而变得越来越慢。基于这些因素,使用文件系统并依赖于页缓存的做法明显要优于维护一个进程内缓存或其他结构,至少我们可以省去了一份进程内部的缓存消耗,同时还可以通过结构紧凑的字节码来替代使用对象的方式以节省更多的空间。如此,我们可以在 32GB 的机器上使用 28GB 至 30GB 的内存而不用担心 GC 所带来的性能问题。此外,即使 Kafka 服务重启,页缓存还是会保持有效,然而进程内的缓存却需要重建。这样也极大地简化了代码逻辑,因为维护页缓存和文件之间的一致性交由操作系统来负责,这样会比进程内维护更加安全有效。

Kafka 中大量使用了页缓存,这是 Kafka 实现高吞吐的重要因素之一。 虽然消息都是先被写入页缓存,然后由操作系统负责具体的刷盘任务的,但在 Kafka 中同样提供了同步刷盘及间断性强制刷盘( fsync )的功能,这些功能可以通过 log.flush.interval.messages、log.flush.interval.ms 等参数来控制。同步刷盘可以提高消息的可靠性,防止由于机器断电等异常造成处于页缓存而没有及时写入磁盘的消息丢失。不过并不建议这么做,刷盘任务就应交由操作系统去调配,消息的可靠性应该由多副本机制来保障,而不是由同步刷盘这种严重影响性能的行为来保障。

image-20221027002304324

# 11.如何查找消息?

假如现在需要查找一个 offset 为 368801 的 message 是什么样的过程呢?

  1. 先找到 offset 的 368801message 所在的 segment 文件(利用二分法查找),这里找到的就是在第二个 segment 文件。

  2. 打开找到的 segment 中的.index 文件(也就是 368796.index 文件,该文件起始偏移量为 368796+1,我们要查找的 offset 为 368801 的 message 在该 index 内的偏移量为 368796+5=368801,所以这里要查找的相对 offset 为 5)。由于该文件采用的是稀疏索引的方式存储着相对 offset 及对应 message 物理偏移量的关系,所以直接找相对 offset 为 5 的索引找不到,这里同样利用二分法查找相对 offset 小于或者等于指定的相对 offset 的索引条目中最大的那个相对 offset,所以找到的是相对 offset 为 4 的这个索引。

  3. 根据找到的相对 offset 为 4 的索引确定 message 存储的物理偏移位置为 256。打开数据文件,从位置为 256 的那个地方开始顺序扫描直到找到 offset 为 368801 的那条 Message。

  4. 这套机制是建立在 offset 为有序的基础上,利用 segment+有序 offset+稀疏索引+二分查找+顺序查找等多种手段来高效的查找数据!至此,消费者就能拿到需要处理的数据进行处理了。

img

# 六.常见问题

# 1.kafka 为什么快?

写入:顺序写入和 MMF(Memory Mapped Files 内存映射文件),批量压缩,稀疏索引

MMFile 它的工作原理是直接利用操作系统的 Page 来实现文件到物理内存的直接映射。完成映射之后你对物理内存的操作会被同步到硬盘上

读取:基于 sendfile 实现零拷贝

  • kafka 是分布式集群,采用分区方式,并行操作
  • 读取数据采用稀疏索引,可以快速定位消费数据
  • 顺序写磁盘
  • 页缓冲零拷贝

Kafka 性能优化:

  1. 零拷贝网络和磁盘
  2. 优秀的网络模型,基于 Java NIO
  3. 高效的文件数据结构设计
  4. Parition 并行和可扩展
  5. 数据批量传输
  6. 数据压缩
  7. 顺序读写磁盘
  8. 无锁轻量级 offset

# 2.说说零拷贝?

image-20221130181855525

传统模式下,当需要对一个文件进行传输的时候,其具体流程细节如下:

  • 调用 read 函数,文件数据从磁盘经过 DMA copy 到内核缓冲区

  • read 函数返回,文件数据从内核缓冲区 cpu copy 到用户缓冲区

  • write 函数调用,将文件数据从用户缓冲区 cpu copy 到内核与 socket 相关的缓冲区

  • 数据从 socket 缓冲区 DMA copy 到网卡的 NIC Buffer

以上细节是传统 read/write 方式进行网络文件传输的方式,我们可以看到,在这个过程当中,文件数据实际上是经过了四次 copy 操作:硬盘—>内核 buf—>用户 buf—>socket 相关缓冲区—>网卡

image-20221110180758139

sendfile 系统调用则提供了一种减少以上多次 copy,提升文件传输性能的方法。在内核版本 2.1 中,引入了 sendfile 系统调用,以简化网络上和两个本地文件之间的数据传输。 sendfile 的引入不仅减少了数据复制,还减少了上下文切换。

  1. sendfile 系统调用,文件数据被 copy 至内核缓冲区
  2. 再从内核缓冲区 copy 至内核中 socket 相关的缓冲区
  3. 最后再 socket 相关的缓冲区 copy 到协议引擎

image-20221110180809391

DMA(Direct Memory Access,直接内存存取) 是所有现代计算机的重要特色,它允许不同速度的硬件装置直接沟通,而不需要依于 CPU 的大量中断负载。在现代计算机中,运算单元不再仅仅是 cpu。网卡/磁盘等都可以认为是 DMA 设备,是一个半自治单元,比如网卡有它自己的运算单元(相当于特异化的 cpu)和自己的缓存,网卡接收和发送数据时是不需要 cpu 的全程参与的,磁盘也是类似的。简单来讲就是 dma 设备就是 cpu 领导下的一个不太聪明的小弟,cpu 负责指挥小弟去干活,但干活的过程中是不需要 cpu 参与的。nio 和 0 拷贝都是为了解放 cpu。

Java 中的零拷贝是依靠java.nio.channels.FileChannel中的 transferTo(long position, long count, WritableByteChannel target)方法来实现的。transferTo 方法的底层实现是基于操作系统的 sendfile 这个 system call 来实现的,无需将数据拷贝到用户态,sendfile 负责把数据从某个 fd(file descriptor)传输到另一个 fd。这样就完成了零拷贝的过程。

mmap 和 sendfile 总结

  1. 都是 Linux 内核提供、实现零拷贝的 API;
  2. sendfile 是将读到内核空间的数据,转到 socket buffer,进行网络发送;
  3. mmap 将磁盘文件映射到内存,支持读和写,对内存的操作会反映在磁盘文件上。
  4. RocketMQ 在消费消息时,使用了 mmap。kafka 使用了 sendFile

mmap(Memory Mapped Files):是一种操作系统提供的内存映射文件的机制,允许应用程序将一个文件映射到其地址空间中,从而使得文件的内容可以直接在内存中读取和写入,而无需通过常规的读写操作来访问文件。这个机制在处理大文件、共享内存和提高文件 I/O 性能方面非常有用。

# 3.为什么去 zookeeper?

这样做的好处有以下几个:

  • Kafka 不再依赖外部框架,而是能够独立运行;
  • controller 管理集群时, 不再需要从 zookeeper 中 先读取数据,集群性能上升;
  • 由于不依赖 zookeeper ,集群扩展时不再受到 zookeeper 读写能力限制;
  • controller 不再动态选举,而是由配置文件规定。这样我们可以有针对性的加强 controller 节点的配置,而不是像以前一样对随机 controller 节点的高负载束手无策

# 4.什么是 kraft 架构?

左图为 Kafka 现有架构,元数据在 zookeeper 中,运行时动态选举 controller ,由 controller 进行 Kafka 集群管理。其中 Zookeeper 集群是 Kafka 用来负责集群元数据的管理、控制器的选举等

右图为 kraft 模式架构(实验性),不再依赖 zookeeper 集群,而是用三台 controller 节点代替 zookeeper ,元数据保存在 controller 中,由 controller 直接进行 Kafka 集群管理

image-20230811021332454

# 5.什么是死信队列?

由于某些原因消息无法被正确地投递,为了确保消息不会被无故地丢弃,一般将其置于一个特殊角色的队列,这个队列一般称为死信队列。

后续分析程序可以通过消费这个死信队列中的内容来分析当时遇到的异常情况,进而可以改善和优化系统。

# 6.kafka 与 RabbitMQ 的对比

RabbitMQ 是使用 Erlang 语言开发的开源消息队列系统,基于 AMQP 协议来实现。AMQP 的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。AMQP 协议更多用在企业系统内,对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。

下面是对比 Kafka 和 RabbitMQ 在您提供的几个方面的信息总结:

对比项 Kafka RabbitMQ
开发语言 Scala, Java Erlang
是否支持多租户
是否支持 topic 优先级 不支持 支持
是否支持消息全局有序 不支持 支持
是否支持消息分区有序 支持 支持
是否内置监控 无内置监控 内置监控
是否支持多个生产者 是,一个 topic 支持多个生产者 是,但需注意一些限制
是否支持多个消费者 是,一个 topic 支持多个消费者
是否支持一个分区多个消费者 不支持 不支持
是否支持 JMX 支持 不支持(非 Java 语言编写)
是否支持加密 支持 支持
消息队列协议支持 仅支持自定义协议 支持 AMQP、MQTT、STOMP 协议
客户端语言支持 支持多语言客户端 支持多语言客户端
是否支持消息追踪 不支持消息追踪 支持消息追踪
是否支持消费者推模式 不支持消费者推模式 支持消费者推模式
是否支持消费者拉模式 支持消费者拉模式 支持消费者拉模式
是否支持广播消息 支持广播消息 支持广播消息
是否支持消息回溯 不支持,消息确认被消费后会被删除 支持消息回溯,消息持久化,记录 offset 和 timestamp
是否支持消息数据持久化 支持消息数据持久化 支持消息数据持久化
是否支持消息堆积 支持消息堆积,并批量持久化到磁盘 堆积
是否支持流量控制 支持控制用户和客户端流量 支持生产者的流量控制
是否支持事务性消息 支持 不支持
元数据管理 通过 Zookeeper 进行管理 默认服务端口 9200
默认监控端口 Kafka Web Console 9000; Kafka Manager 9000 15672
网络开销 相对较小 相对较大
内存消耗 相对较小 相对较大
CPU 消耗 相对较大 相对较小

# 7.使用场景对比

以下场景比较适合使用 Kafka。如果有大量的事件(10 万以上/秒)、你需要以分区的,顺序的,至少传递成功一次到混杂了在线和打包消费的消费者、希望能重读消息、你能接受目前是有限的节点级别高可用就可以考虑 kafka。

  • 严格的消息顺序
  • 延长消息留存时间,包括过去消息重放的可能
  • 传统解决方案无法满足的高伸缩能力

以下场景比较适合使用 RabbitMQ。如果是较少的事件(2 万以上/秒)并且需要通过复杂的路由逻辑去找到消费者、你希望消息传递是可靠的、并不关心消息传递的顺序、而且需要现在就支持集群-节点级别的高可用就可以考虑 rabbitmq。

  • 高级灵活的路由规则
  • 消息时序控制,控制消息过期或消息延迟
  • 高级的容错处理能力,在消费者更有可能处理消息不成功的情景中(瞬时或持久)
  • 更简单的消费者实现

# 8.Follower 不提供读服务?

这个问题本质上是对性能和一致性的取舍,换句话说,相当于为什么不支持读写分离。试想一下,如果 Follower 副本也对外提供服务那会怎么样呢?首先,性能是肯定会有所提升的。但同时,会出现一系列问题。

类似数据库事务中的幻读,脏读。比如你现在写入一条数据到 kafka 主题 a,

消费者 b 从主题 a 消费数据,却发现消费不到,因为消费者 b 去读取的那个分区副本中,最新消息还没写入。而这个时候,另一个消费者 c 却可以消费到最新那条数据,因为它消费了 Leader 副本。Kafka 通过 WH 和 Offset 的管理来决定 Consumer 可以消费哪些数据,已经当前写入的数据。

# 9.怎么解决消息堆压?

  • 自身场景下,消息堆压是暂时的,消息堆压只是突发状况,就算不额外处理,随着时间流逝也可消费完毕。
  • 假如存在持续性消息堆压,可以考虑临时增加消费者的数量,提升消费者的消费能力。
  • 如果是线上突发问题,要临时扩容,增加消费端的数量
  • 降级一些非核心的业务
  • 优化消费端的业务处理逻辑

如果是线上突发问题,要临时扩容,增加消费端的数量,与此同时,降级一些非核心的业务。通过扩容和降级承担流量。其次,如通过监控,日志等手段分析是否消费端的业务逻辑代码出现了问题,优化消费端的业务处理逻辑。

# 10.kafka 为什么吞吐量高?

Apache Kafka 是一个高吞吐量、分布式、可扩展的发布-订阅消息系统,广泛用于构建实时数据管道和流处理应用。Kafka 能实现高吞吐量的主要原因有几个,这里列举并解释一些关键因素:

分布式架构:

  1. 多个 Broker: Kafka 集群由多个服务器(Broker)组成,每个 Broker 可以处理更多的消息。
  2. 分区(Partition): 每个主题(Topic)可以分成多个分区,每个分区可以独立地读写,提高并行性。

顺序写入和存储优化:

  1. 写入优化: Kafka 使用了顺序写入的方式来减少磁盘 I/O 操作,因此可以高效地写入数据。
  2. Page Cache: 读写操作高度优化,可以利用操作系统的 Page Cache,减少实际的磁盘操作次数。

高效的读取操作:

  1. 消费者偏移(Consumer Offset): Kafka 记录消费者读取的位置,所以读操作非常快。
  2. Pull 模型: Kafka 使用 Pull 模型而不是 Push 模型,消费者根据需要拉取消息,这样更容易控制流量和处理压力。

批处理与压缩:

  1. 批量发送和接收: Kafka 支持消息的批量发送和接收,这减少了网络调用的开销。
  2. 消息压缩: 支持多种压缩算法(如 Gzip、Snappy 等),减少网络传输的数据量。

其他:

  1. 异步通信: Kafka 支持异步生产和消费,减少等待时间,提高吞吐量。
  2. 可配置的副本: 通过数据副本,Kafka 不仅确保了数据的高可用性,也允许更多的并行读取操作。

这些因素共同作用,使 Kafka 成为一个高吞吐量的消息队列系统。这些设计选择背后的底层原理主要关注于减少磁盘 I/O、网络 I/O 以及各种等待和锁操作,从而最大化单机和集群的性能表现。

上次更新: 11/26/2024, 10:00:19 PM