kafka执行脚本默认在安装的bin目录下,本文中示例均基于bin目录执行。
#查询topic状态,新建,删除,扩容 kafka-topics.sh #查看,修改kafka配置 kafka-configs.sh #配置,查看kafka集群鉴权信息 kafka-acls.sh #生产命令 kafka-console-producer.sh # 消费命令 kafka-console-consumer.sh #查看消费者组,重置消费位点等 kafka-consumer-groups.sh #kafka自带消费性能测试命令 kafka-consumer-perf-test.sh # #kafka集群间同步命令 kafka-mirror-maker.sh #重新选举topic分区leader kafka-preferred-replica-election.sh #kafka自带生产性能测试命令 kafka-producer-perf-test.sh #kafka数据重平衡命令 kafka-reassign-partitions.sh #kafka执行脚本 kafka-run-class.sh #进程启动 kafka-server-start.sh #进程停止 kafka-server-stop.sh
使用kafka-topics.sh。
./kafka-topics.sh --create --bootstrap-server 192.168.1.12:9092 --replication-factor 3 --partitions 2 --topic my-test-topic
参数说明:
–create:创建主题;
–bootstrap-server:指定kafka服务器地址;
–replication-factor:副本数量,注意不能大于broker数量;如果不提供,则会用集群中默认配置;
–partitions:分区数量,当创建或者修改topic的时候,用这个来指定分区数;如果创建的时候没有提供参数,则用集群中默认值; 注意如果是修改的时候,分区比之前小会有问题;
–topic
–config
./kafka-topics.sh --bootstrap-server 192.168.1.12:9092 --delete --topic my-test-topic
参数说明:
也可以理解为topic分区扩容。
# 修改单个topic ./kafka-topics.sh --bootstrap-server 192.168.1.12:9092 --alter --topic my-test-topic --partitions 6 # 批量修改topic ./kafka-topics.sh --topic ".*?" --bootstrap-server 192.168.1.12:9092 --alter --partitions 6
参数说明:
# 查询单个描述 ./kafka-topics.sh --topic my-test-topic --bootstrap-server 192.168.1.12:9092 --describe --exclude-internal # 查询所有描述 ./kafka-topics.sh --topic ".*?" --bootstrap-server 192.168.1.12:9092 --describe --exclude-internal
参数说明:
# 查询所有主题 ./kafka-topics.sh --bootstrap-server 192.168.1.12:9092 --list --exclude-internal # 查询特定主题 ./kafka-topics.sh --bootstrap-server 192.168.1.12:9092 --list --exclude-internal --topic "my-test-topic*"
参数说明:
使用kafka-configs.sh。
# 查询单个topic配置 ./kafka-configs.sh --describe --bootstrap-server 192.168.1.12:9092 --topic my-test-topic # 查询所有topic动态配置 ./kafka-configs.sh --describe --bootstrap-server 192.168.1.12:9092 --entity-type topics # 查询所有topic动态+静态配置 ./kafka-configs.sh --describe --bootstrap-server 192.168.1.12:9092 --entity-type topics --all # 查询版本信息 ./kafka-configs.sh --describe --bootstrap-server 192.168.1.12:9092 --version
# 添加配置(--add-config) ./kafka-configs.sh --bootstrap-server 192.168.1.12:9092 --alter --entity-type topics --entity-name my-test-topic --add-config file.delete.delay.ms=222222,retention.ms=999999 # 删除配置(--delete-config) ./kafka-configs.sh --bootstrap-server 192.168.1.12:9092 --alter --entity-type topics --entity-name my-test-topic --delete-config file.delete.delay.ms,retention.ms
参数说明:
./kafka-console-producer.sh --bootstrap-server 192.168.1.12:9092 --topic my-test-topic >hello kafka >
可选参数说明:
# 消费特定topic ./kafka-console-consumer.sh --bootstrap-server 192.168.1.12:9092 --topic my-test-topic # 读取所有数据 ./kafka-console-consumer.sh --bootstrap-server 192.168.1.12:9092 --from-beginning --topic my-test-topic
参数说明:
# 发送 5000000 条大小为 1KB 的消息到地址192.168.1.12:9092 ./kafka-producer-perf-test.sh --topic my-test-topic --num-records 5000000 --record-size 1024 --throughput -1 --producer-props bootstrap.servers=192.168.1.12:9092
参数说明:
./kafka-consumer-perf-test.sh --bootstrap-server 192.168.1.12:9092,192.168.1.13:9092,192.168.1.14:9092 --topic my-test-topic --messages 5000000 --threads 32 --reporting-interval 10000 --show-detailed-stats
参数说明: