# 一.主题命令

# 1.操作主题命令参数

bin/kafka-topics.sh
1
参数 描述
--bootstrap-server <String: server toconnect to> 连接的 Kafka Broker 主机名称和端口号
--topic <String: topic> 操作的 topic 名称
--create 创建主题
--delete 删除主题
--alter 修改主题
--list 主题列表
--describe 查看主题详细描述
--partitions <Integer: # of partitions> 设置分区数
--replication-factor<Integer: replication factor> 设置分区副本
--config <String: name=value> 更新系统默认的配置

# 2.创建 first topic

bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --topic first --create --partitions 1 --replication-factor 3
1

# 3.查看主题的详情

bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --topic first --describe
1

# 4.修改分区数

(注意:分区数只能增加,不能减少)

bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic first --partitions 3
1

# 5.再次查看主题的详情

bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --topic first --describe
1

image-20221021164157285

# 6.常用命令

#创建tpic
kafka-topics.sh --bootstrap-server Ava01:9092 --create --replication-factor 2 --partitions 3 --topic zzz

#显示topic详细信息
kafka-topics.sh --bootstrap-server Ava01:9092 --describe --topic zzz

#显示topic列表
kafka-topics.sh --bootstrap-server Ava01:9092 --list

#修改分区数 只能往大了改 不能往小了改
kafka-topics.sh --bootstrap-server Ava01:9092 --alter --topic zzz --partitions 3

#删除topic
kafka-topics.sh --bootstrap-server Ava01:9092 --delete --topic zzz

#发送消息
kafka-console-producer.sh --broker-list Ava01:9092 --topic first

#消费消息
kafka-console-consumer.sh --bootstrap-server Ava01:9092 --topic zzz

#节点寄了重平衡
kafka-preferred-replica-election.sh --bootstrap-server Ava01:9092

#显示当前topic每个分区最新的offset
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list Ava01:9092 --topic test --time -1

#单点启动kafka
bin/kafka-server-start.sh -daemon config/server.properties

#消费者消费topic __consumer_offsets  from-beginning表示从最开始开始读
kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server  Ava01:9092 --consumer.config config/consumer.properties  --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning >>/opt/module/kafka-2.4.1/offset/kafka_offset.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

# 二.生产者

# 1.查看生产者命令

bin/kafka-console-producer.sh
1
bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first
1

# 2.broker 信息

#连接的 Kafka Broker 主机名称和端口号。
--bootstrap-server <String: server toconnect to>
1
2

# 3.topic 信息

#操作的 topic 名称。
--topic <String: topic>
1
2

# 三.消费者

# 1.查看消费者命令

bin/kafka-console-consumer.sh
1
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
1

# 2.broker 信息

#连接的 Kafka Broker 主机名称和端口号。
--bootstrap-server <String: server toconnect to>
1
2

# 3.topic 信息

#操作的 topic 名称。
--topic <String: topic>
1
2

# 4.普通消费消息

public class CustomConsumer_01 {
    public static void main(String[] args) {
        //1.配置文件
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091,localhost:9092,localhost:9093");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CompanySerializer.class);
        // 设置消费者组ID
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group.demo");
        // 设置消费者ID
        properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer.client.id.demo");
        //2.创建生产者
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
        // 订阅主题
        kafkaConsumer.subscribe(ImmutableList.of("Hello-Kafka"));
        while (true) {
            ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(String.format("topic=%s, partition=%s, offset=%s, key=%s, value=%s",
                        record.topic(), record.partition(), record.offset(), record.key(), record.value()));
            }
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

# 5.多线程消费

public class CustomConsumer_02 {
    public static void main(String[] args) {
        //1.配置文件
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091,localhost:9092,localhost:9093");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CompanySerializer.class);
        // 设置消费者组ID
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group.demo");
        // 设置消费者ID
        properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer.client.id.demo");
        //多线程
        new Thread(new KafkaConsumerProcess(properties, "Hello-Kafka"), "thread-one").start();
        new Thread(new KafkaConsumerProcess(properties, "Hello-Kafka"), "thread-two").start();
        new Thread(new KafkaConsumerProcess(properties, "Hello-Kafka"), "thread-three").start();
        new Thread(new KafkaConsumerProcess(properties, "Hello-Kafka"), "thread-four").start();
    }

    /**
     * 多线程内部类
     */
    public static class KafkaConsumerProcess implements Runnable {
        private KafkaConsumer<String, String> consumer;

        public KafkaConsumerProcess(Map configs, String topic) {
            this.consumer = new KafkaConsumer<String, String>(configs);
            this.consumer.subscribe(ImmutableList.of(topic));
        }

        @Override
        public void run() {
            try {
                while (true) {
                    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                    for (ConsumerRecord<String, String> record : records) {
                        System.out.println(String.format("thread=%s, topic=%s, partition=%s, offset=%s, key=%s, value=%s",
                                Thread.currentThread().getName(), record.topic(), record.partition(), record.offset(),
                                record.key(), record.value()));
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                consumer.close();
            }
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48

# 6.指定分区消费

KafkaConsumer consumer = new KafkaConsumer(properties);
// 分配对应的分区
consumer.assign(ImmutableList.of(new TopicPartition("Hello-Kafka", 0)));
1
2
3

# 7.正则匹配

KafkaConsumer consumer = new KafkaConsumer(configs);
// 订阅主题
consumer.subscribe(Pattern.compile("Hello-.*"));
1
2
3

# 8.自定义反序列化

public class CustomConsumer_05 {
    public static final AtomicBoolean isRunning = new AtomicBoolean(true);

    public static void main(String[] args) {
        Map configs = new HashMap<>();
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ServerConfig.BOOTSTRAP_SERVERS_CONFIG);
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, "group.demo");
        configs.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer.client.id.demo");
        // 设置对应的反序列化接口
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CompanyDeserializer.class.getName());

        KafkaConsumer consumer = new KafkaConsumer(configs);
        //正则匹配
        consumer.subscribe(Pattern.compile("Hello-.*"));
        while (isRunning.get()) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(String.format("topic=%s, partition=%s, offset=%s, key=%s, value=%s",
                        record.topic(), record.partition(), record.offset(), record.key(), record.value()));
            }
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class CompanyDeserializer implements Deserializer<Company> {

    public static final String ENCODING = "UTF8";

    @Override
    public Company deserialize(String topic, byte[] data) {
        if (data == null || data.length <= 0) {
            return null;
        }
        ByteBuffer buffer = ByteBuffer.wrap(data);
        int nameLen, addressLen;
        String name = "", address = "";
        nameLen = buffer.getInt();
        byte[] nameBytes = new byte[nameLen];
        buffer.get(nameBytes);
        addressLen = buffer.getInt();
        byte[] addressBytes = new byte[addressLen];
        buffer.get(addressBytes);
        try {
            name = new String(nameBytes, ENCODING);
            address = new String(addressBytes, ENCODING);
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }

        return new Company(name, address);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
上次更新: 11/26/2024, 10:00:19 PM