Kafka 命令大全

1.4k 记录 发表评论

如果 $PATH 中没有 KAFKA 的bin路径,那首先要将其加入 $PATH 中。

export PATH=$KAFKA_HOME/bin:$PATH

集群和 Broker 命令

## 查看 Kafka broker 版本
kafka-broker-api-versions --bootstrap-server localhost:9092 --version

## 连接到 Zookeeper
zookeeper-shell zookeeper:2181

## 显示 Kafka 集群 ID
zookeeper-shell zookeeper:2181 get /cluster/id

## 查看 Kafka 集群中的所有 Broker
zookeeper-shell zookeeper:2181 ls /brokers/ids

## 查看 Kafka 集群中的所有 topic
zookeeper-shell zookeeper:2181 ls /brokers/topics

## 查看 Kafka 集群中某个 topic 的细节
zookeeper-shell zookeeper:2181 get /brokers/topics/my-topic

Topic 命令

## 【创建】一个名为 my-topic 的 topic
kafka-topics --bootstrap-server localhost:9092 --create --topic my-topic --partitions 3 --replication-factor 3

## 如果名为 my-topic 的 topic 【不存在则创建】,注意,如果 topic 已经存在则不作任何提示
kafka-topics --bootstrap-server localhost:9092 --create --topic my-topic --partitions 3 --replication-factor 1 --if-not-exists

## 列出 Kafka 中的【所有 topic】
kafka-topics --bootstrap-server localhost:9092 --list

## 列出 Kafka 中除内部 topic 外的所有 topic
kafka-topics --bootstrap-server localhost:9092 --list --exclude-internal

## 显示 Kafka topic 【详情】
kafka-topics --bootstrap-server localhost:9092 --topic my-topic --describe

## 增加 Kafka topic 分区,注意:topic 分区只能增加,不能减少
kafka-topics --bootstrap-server localhost:9092 --alter --topic my-topic --partitions 5

## 更改 Kafka topic 的【留存期限】(默认是 7 天 604800000 毫秒,如果不限制,可设置为 -1)
kafka-configs --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name my-topic --add-config retention.ms=259200000

## 【清空】 Kafka topic,目前没有方法直接清空 topic,但是可以通过修改留存时间来达到这个目的。如下命令会在 1 秒后清除所有之前的数据
kafka-configs --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name my-topic --add-config retention.ms=1000

## 恢复留存时间为默认值(7 天)
kafka-configs --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name my-topic --delete-config retention.ms

## 列出 Kafka 中所有配置了 overrides 的 topic
kafka-topics --bootstrap-server localhost:9092 --describe --topics-with-overrides

## 显示 Kafka 特定 topic 的 overrides 配置
kafka-configs --bootstrap-server localhost:9092 --describe --entity-type topics --entity-name my-topic

## 【删除】 Kafka topic
kafka-topics --bootstrap-server localhost:9092 --delete --topic my-topic

生产者命令

## 打开 Kafka topic 命令行,在命令行之后输入内容再回车就会生产一条消息到 topic 中
kafka-console-producer --broker-list localhost:9092 --topic my-topic
  >Hello World

## 从【文件中】生产消息到 Kafka topic
kafka-console-producer --broker-list localhost:9092 --topic my-topic < topic-input.txt

## 生产【键值对】消息,并以 : 作为分割
kafka-console-producer --broker-list localhost:9092 --topic some-topic --property parse.key=true --property key.separator=:
  >name:tony
  >age:22

消费者命令

##【 消费】 Kafka topic 中的消息。如果消费者没有指定 group id,则会启动一个随机的组
kafka-console-consumer --bootstrap-server localhost:9092 --topic my-topic

## 消费 Kafka topic 中的键值对消息,并同时【显示键和值】(默认只显示值)
kafka-console-consumer --bootstrap-server localhost:9092 \
  --topic my-topic \
  --formatter kafka.tools.DefaultMessageFormatter \
  --property print.key=true \
  --property print.value=true

## 【从头消费】 Kafka topic 中的消息
kafka-console-consumer --bootstrap-server localhost:9092 --topic my-topic --from-beginning

其他命令

找到所有与领导者不同步的一个或多个副本分区。

kafka-topics --bootstrap-server localhost:9092 --describe --under-replicated-partitions

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注

昵称 *