# 一.主题命令
# 1.操作主题命令参数
bin/kafka-topics.sh
1
参数 | 描述 |
---|---|
--bootstrap-server <String: server toconnect to> | 连接的 Kafka Broker 主机名称和端口号 |
--topic <String: topic> | 操作的 topic 名称 |
--create | 创建主题 |
--delete | 删除主题 |
--alter | 修改主题 |
--list | 主题列表 |
--describe | 查看主题详细描述 |
--partitions <Integer: # of partitions> | 设置分区数 |
--replication-factor<Integer: replication factor> | 设置分区副本 |
--config <String: name=value> | 更新系统默认的配置 |
# 2.创建 first topic
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --topic first --create --partitions 1 --replication-factor 3
1
# 3.查看主题的详情
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --topic first --describe
1
# 4.修改分区数
(注意:分区数只能增加,不能减少)
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic first --partitions 3
1
# 5.再次查看主题的详情
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --topic first --describe
1
# 6.常用命令
#创建tpic
kafka-topics.sh --bootstrap-server Ava01:9092 --create --replication-factor 2 --partitions 3 --topic zzz
#显示topic详细信息
kafka-topics.sh --bootstrap-server Ava01:9092 --describe --topic zzz
#显示topic列表
kafka-topics.sh --bootstrap-server Ava01:9092 --list
#修改分区数 只能往大了改 不能往小了改
kafka-topics.sh --bootstrap-server Ava01:9092 --alter --topic zzz --partitions 3
#删除topic
kafka-topics.sh --bootstrap-server Ava01:9092 --delete --topic zzz
#发送消息
kafka-console-producer.sh --broker-list Ava01:9092 --topic first
#消费消息
kafka-console-consumer.sh --bootstrap-server Ava01:9092 --topic zzz
#节点寄了重平衡
kafka-preferred-replica-election.sh --bootstrap-server Ava01:9092
#显示当前topic每个分区最新的offset
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list Ava01:9092 --topic test --time -1
#单点启动kafka
bin/kafka-server-start.sh -daemon config/server.properties
#消费者消费topic __consumer_offsets from-beginning表示从最开始开始读
kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server Ava01:9092 --consumer.config config/consumer.properties --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning >>/opt/module/kafka-2.4.1/offset/kafka_offset.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# 二.生产者
# 1.查看生产者命令
bin/kafka-console-producer.sh
1
bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first
1
# 2.broker 信息
#连接的 Kafka Broker 主机名称和端口号。
--bootstrap-server <String: server toconnect to>
1
2
2
# 3.topic 信息
#操作的 topic 名称。
--topic <String: topic>
1
2
2
# 三.消费者
# 1.查看消费者命令
bin/kafka-console-consumer.sh
1
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
1
# 2.broker 信息
#连接的 Kafka Broker 主机名称和端口号。
--bootstrap-server <String: server toconnect to>
1
2
2
# 3.topic 信息
#操作的 topic 名称。
--topic <String: topic>
1
2
2
# 4.普通消费消息
public class CustomConsumer_01 {
public static void main(String[] args) {
//1.配置文件
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091,localhost:9092,localhost:9093");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CompanySerializer.class);
// 设置消费者组ID
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group.demo");
// 设置消费者ID
properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer.client.id.demo");
//2.创建生产者
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
// 订阅主题
kafkaConsumer.subscribe(ImmutableList.of("Hello-Kafka"));
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.println(String.format("topic=%s, partition=%s, offset=%s, key=%s, value=%s",
record.topic(), record.partition(), record.offset(), record.key(), record.value()));
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# 5.多线程消费
public class CustomConsumer_02 {
public static void main(String[] args) {
//1.配置文件
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091,localhost:9092,localhost:9093");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CompanySerializer.class);
// 设置消费者组ID
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group.demo");
// 设置消费者ID
properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer.client.id.demo");
//多线程
new Thread(new KafkaConsumerProcess(properties, "Hello-Kafka"), "thread-one").start();
new Thread(new KafkaConsumerProcess(properties, "Hello-Kafka"), "thread-two").start();
new Thread(new KafkaConsumerProcess(properties, "Hello-Kafka"), "thread-three").start();
new Thread(new KafkaConsumerProcess(properties, "Hello-Kafka"), "thread-four").start();
}
/**
* 多线程内部类
*/
public static class KafkaConsumerProcess implements Runnable {
private KafkaConsumer<String, String> consumer;
public KafkaConsumerProcess(Map configs, String topic) {
this.consumer = new KafkaConsumer<String, String>(configs);
this.consumer.subscribe(ImmutableList.of(topic));
}
@Override
public void run() {
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.println(String.format("thread=%s, topic=%s, partition=%s, offset=%s, key=%s, value=%s",
Thread.currentThread().getName(), record.topic(), record.partition(), record.offset(),
record.key(), record.value()));
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
consumer.close();
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# 6.指定分区消费
KafkaConsumer consumer = new KafkaConsumer(properties);
// 分配对应的分区
consumer.assign(ImmutableList.of(new TopicPartition("Hello-Kafka", 0)));
1
2
3
2
3
# 7.正则匹配
KafkaConsumer consumer = new KafkaConsumer(configs);
// 订阅主题
consumer.subscribe(Pattern.compile("Hello-.*"));
1
2
3
2
3
# 8.自定义反序列化
public class CustomConsumer_05 {
public static final AtomicBoolean isRunning = new AtomicBoolean(true);
public static void main(String[] args) {
Map configs = new HashMap<>();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ServerConfig.BOOTSTRAP_SERVERS_CONFIG);
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configs.put(ConsumerConfig.GROUP_ID_CONFIG, "group.demo");
configs.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer.client.id.demo");
// 设置对应的反序列化接口
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CompanyDeserializer.class.getName());
KafkaConsumer consumer = new KafkaConsumer(configs);
//正则匹配
consumer.subscribe(Pattern.compile("Hello-.*"));
while (isRunning.get()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.println(String.format("topic=%s, partition=%s, offset=%s, key=%s, value=%s",
record.topic(), record.partition(), record.offset(), record.key(), record.value()));
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class CompanyDeserializer implements Deserializer<Company> {
public static final String ENCODING = "UTF8";
@Override
public Company deserialize(String topic, byte[] data) {
if (data == null || data.length <= 0) {
return null;
}
ByteBuffer buffer = ByteBuffer.wrap(data);
int nameLen, addressLen;
String name = "", address = "";
nameLen = buffer.getInt();
byte[] nameBytes = new byte[nameLen];
buffer.get(nameBytes);
addressLen = buffer.getInt();
byte[] addressBytes = new byte[addressLen];
buffer.get(addressBytes);
try {
name = new String(nameBytes, ENCODING);
address = new String(addressBytes, ENCODING);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return new Company(name, address);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28