如果 $PATH
中没有 KAFKA 的bin
路径,那首先要将其加入 $PATH
中。
export PATH=$KAFKA_HOME/bin:$PATH
集群和 Broker 命令
## 查看 Kafka broker 版本
kafka-broker-api-versions --bootstrap-server localhost:9092 --version
## 连接到 Zookeeper
zookeeper-shell zookeeper:2181
## 显示 Kafka 集群 ID
zookeeper-shell zookeeper:2181 get /cluster/id
## 查看 Kafka 集群中的所有 Broker
zookeeper-shell zookeeper:2181 ls /brokers/ids
## 查看 Kafka 集群中的所有 topic
zookeeper-shell zookeeper:2181 ls /brokers/topics
## 查看 Kafka 集群中某个 topic 的细节
zookeeper-shell zookeeper:2181 get /brokers/topics/my-topic
Topic 命令
## 【创建】一个名为 my-topic 的 topic
kafka-topics --bootstrap-server localhost:9092 --create --topic my-topic --partitions 3 --replication-factor 3
## 如果名为 my-topic 的 topic 【不存在则创建】,注意,如果 topic 已经存在则不作任何提示
kafka-topics --bootstrap-server localhost:9092 --create --topic my-topic --partitions 3 --replication-factor 1 --if-not-exists
## 列出 Kafka 中的【所有 topic】
kafka-topics --bootstrap-server localhost:9092 --list
## 列出 Kafka 中除内部 topic 外的所有 topic
kafka-topics --bootstrap-server localhost:9092 --list --exclude-internal
## 显示 Kafka topic 【详情】
kafka-topics --bootstrap-server localhost:9092 --topic my-topic --describe
## 增加 Kafka topic 分区,注意:topic 分区只能增加,不能减少
kafka-topics --bootstrap-server localhost:9092 --alter --topic my-topic --partitions 5
## 更改 Kafka topic 的【留存期限】(默认是 7 天 604800000 毫秒,如果不限制,可设置为 -1)
kafka-configs --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name my-topic --add-config retention.ms=259200000
## 【清空】 Kafka topic,目前没有方法直接清空 topic,但是可以通过修改留存时间来达到这个目的。如下命令会在 1 秒后清除所有之前的数据
kafka-configs --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name my-topic --add-config retention.ms=1000
## 恢复留存时间为默认值(7 天)
kafka-configs --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name my-topic --delete-config retention.ms
## 列出 Kafka 中所有配置了 overrides 的 topic
kafka-topics --bootstrap-server localhost:9092 --describe --topics-with-overrides
## 显示 Kafka 特定 topic 的 overrides 配置
kafka-configs --bootstrap-server localhost:9092 --describe --entity-type topics --entity-name my-topic
## 【删除】 Kafka topic
kafka-topics --bootstrap-server localhost:9092 --delete --topic my-topic
生产者命令
## 打开 Kafka topic 命令行,在命令行之后输入内容再回车就会生产一条消息到 topic 中
kafka-console-producer --broker-list localhost:9092 --topic my-topic
>Hello World
## 从【文件中】生产消息到 Kafka topic
kafka-console-producer --broker-list localhost:9092 --topic my-topic < topic-input.txt
## 生产【键值对】消息,并以 : 作为分割
kafka-console-producer --broker-list localhost:9092 --topic some-topic --property parse.key=true --property key.separator=:
>name:tony
>age:22
消费者命令
##【 消费】 Kafka topic 中的消息。如果消费者没有指定 group id,则会启动一个随机的组
kafka-console-consumer --bootstrap-server localhost:9092 --topic my-topic
## 消费 Kafka topic 中的键值对消息,并同时【显示键和值】(默认只显示值)
kafka-console-consumer --bootstrap-server localhost:9092 \
--topic my-topic \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property print.value=true
## 【从头消费】 Kafka topic 中的消息
kafka-console-consumer --bootstrap-server localhost:9092 --topic my-topic --from-beginning
其他命令
找到所有与领导者不同步的一个或多个副本分区。
kafka-topics --bootstrap-server localhost:9092 --describe --under-replicated-partitions