kafka命令总结
topic的相关操作
创建topic
-
创建一个叫做“test”的topic,它只有二个分区,一个副本
-
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic test
名词 解释 --zookeeper 后面加zookeeper的地址 --replication-factor 副本数 --partitions 分区数 --topic topic的名字
删除topic
- bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic 【topic name】
修改topic
- bin/kafka-topics.sh --zookeeper localhost:2181 --alter --partitions 20 --topic test
- 修改topic为test的分区为20
查看topic
-
bin/kafka-topics.sh --list --zookeeper localhost:2181
-
查看集群中 topic的数量
查看某个topic的详细内容
- bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
消费数据
消费数据相关
- bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test <--from-beginning>
--from-beginning 从头开始 - bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --describe --group cjpt
- bin/kafka-consumer-groups.sh --group cjpt-test-htga --describe --zookeeper localhost:2181
- 2.2版本后的
- bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.64:9092 --from-beginning --topic test
- 查看分区情况
- bin/kafka-run-class.sh kafka.tools.GetOffsetShell --topic test_topic --time -1 --broker-list localhost:9092 --partitions 5
- 从头开始消费
- bin/kafka-console-consumer.sh --zookeeper 10.4.2.66:2181 --from-beginning --topic test --consumer.config config/consumer.properties
查看消费情况
- bin/kafka-consumer-groups.sh --group testGroup --describe --bootstrap-server localhost:9092
- 查看testGroup的消费情况的
- bin/kafka-consumer-groups.sh --zookeeper 10.4.2.66:2181 --describe --group test-consumer-group
查看数据的生产情况
- bin/kafka-console-producer.sh --broker-list docker-master:9092,docker-slave01:9092,docker-slave02:9092 --topic topic_test
生产数据
- bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
kafka性能测试
生产性能测试
bin/kafka-producer-perf-test.sh --producer-props bootstrap.servers=192.168.1.63:9092,192.168.1.64:9092,192.168.1.65:9092 --record-size 102400 --num-records 10000 --throughput -1 --topic test
消费性能测试
bin/kafka-consumer-perf-test.sh --zookeeper 192.168.1.63:2181,192.168.1.64:2181,192.168.1.65:2181 --broker-list 192.168.1.63:9092,192.168.1.64:9092,192.168.1.65:9092 --fetch-size 1024 --messages 100000 --topic test --threads 3 --group producer_consumer
kafka权限
赋Consumer权限命令
1bin/kafka-acls.sh --authorizer-properties zookeeper.connect=<ZooKeeper集群IP:24002/kafka > --add --allow-principal User:<用户名> --consumer --topic <Topic名称> --group <消费者组名称>
赋Producer权限命令,由管理员用户操作。
1bin/kafka-acls.sh --authorizer-properties zookeeper.connect=<ZooKeeper集群IP:24002/kafka > --add --allow-principal User:<用户名> --producer --topic <Topic名称>
分区计划
1修改topic的备份数量
2//校验是否正确
3bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file ./topic-test.json --verify
4
5输出信息:修改之前报这个信息,说备份数量和文本不一致(因为文本内容是要添加副本数,肯定不一致),证明文本没有问题,可以执行
6
7[root@cjmaster01 kafka_2.11-0.9.0.1]# bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file ./topic-test.json --verify
8Status of partition reassignment:
9ERROR: Assigned replicas (0) don't match the list of replicas for reassignment (0,1) for partition [topic-test,1]
10ERROR: Assigned replicas (2) don't match the list of replicas for reassignment (1,2) for partition [topic-test,3]
11ERROR: Assigned replicas (2) don't match the list of replicas for reassignment (2,1) for partition [topic-test,0]
12ERROR: Assigned replicas (1) don't match the list of replicas for reassignment (1,2) for partition [topic-test,2]
13ERROR: Assigned replicas (1) don't match the list of replicas for reassignment (1,0) for partition [topic-test,5]
14ERROR: Assigned replicas (0) don't match the list of replicas for reassignment (1,0) for partition [topic-test,4]
15Reassignment of partition [topic-test,1] failed
16Reassignment of partition [topic-test,3] failed
17Reassignment of partition [topic-test,0] failed
18Reassignment of partition [topic-test,2] failed
19Reassignment of partition [topic-test,5] failed
20Reassignment of partition [topic-test,4] failed
21
22//上边返回成功了再执行
23bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file ./topic-test.json --execute
24
25topic-test.json文件的格式
26{
27 "partitions": [{
28 "topic": "topic-test"
29 "partition": 0,
30 "replicas": [2, 1]
31 },
32 {
33 "topic": "topic-test",
34 "partition": 1,
35 "replicas": [0, 1]
36 }
37 }
38 ],
39 "version": 1
40
41}
42
kafka 相关例子
1bin/kafka-console-producer.sh \
2--broker-list master:9092,slave01:9092,slave02:9092 \
3--topic test
4
5
6
kafka 集群的注意点
获取到 Kafka 集群的 zookeeper.connect 信息。
1通过读取 config/server.properties 文件,获取到 Kafka 集群的 zookeeper.connect 信息。
2为简化后续步骤的信息输入,这里 创建一个临时变量 zkUrls 保存这一信息,
3例如:
43. cat config/server.properties | grep "^zookeeper.connect " 然后对 zkUrls 进行赋值:
54. zkUrls=" 192.168.115.11:24002,192.168.115.12:24002,192.168.115.3:24002/kafka"
65. 创建一个 Topic,名为 testTopic: bin/kafka-topics.sh --zookeeper $zkUrls
7--create --topic testTopic --partitions 1 --replication-factor 2
86. 查询集群中的 Topic 信息:
9bin/kafka-topics.sh --zookeeper $zkUrls --list bin/kafka-topics.sh --zookeeper $zkUrls --describe --topic testTopic
107. 删除 5 中创建的Topic:
11bin/kafka-topics.sh --zookeeper $zkUrls --delete --topic testTopic
128. 再次查询集群中的 Topic 信息:
13bin/kafka-topics.sh --zookeeper $zkUrls --list bin/kafka-topics.sh --zookeeper $zkUrls --describe --topic testTopic
华为云客户端kafka相关命令
kerberos验证
-
进入安装好的kafka客户端
1cd /htga/client
-
执行以下命令配置环境变量
1source bigdata_env
-
执行以下命令,进行用户认证
1kinit 组件业务用户 2kinit cjpt
-
输入密码
1SNI!@ffer34
-
执行以下命令切换到Kafka客户端安装目录
1cd Kafka/kafka/
kafka相关命令
查看当前集群的topic列表
1bin/kafka-topics.sh --list --zookeeper <ZooKeeper集群IP:24002/kafka>
2#样例如下:
3bin/kafka-topics.sh --list --zookeeper 13.104.24.63:24002,13.104.24.64:24002,13.104.24.65:24002/kafka
查看单个Topic详细信息。
1bin/kafka-topics.sh --describe --zookeeper <ZooKeeper集群IP:24002/kafka> --topic <Topic名称>
2#样例如下
3bin/kafka-topics.sh --describe --zookeeper 13.104.24.63:24002,13.104.24.64:24002,13.104.24.65:24002/kafka --topic topic_zizhu_gsm
创建Topic,由管理员用户操作
1bin/kafka-topics.sh --create --zookeeper <ZooKeeper集群IP:24002/kafka> --partitions 6 --replication-factor 3 --topic <Topic名称>
2#样例如下
3bin/kafka-topics.sh --create --zookeeper 13.104.24.63:24002,13.104.24.64:24002,13.104.24.65:24002/kafka --partitions 6 --replication-factor 3 --topic topic_test
删除Topic,由管理员用户操作。
1bin/kafka-topics.sh --delete --zookeeper <ZooKeeper集群IP:24002/kafka> --topic <Topic名称>
2#样例如下
3bin/kafka-topics.sh --delete --zookeeper 13.104.24.63:24002,13.104.24.64:24002,13.104.24.65:24002/kafka --topic topic_test
New Producer API生产消息,需要拥有该Topic生产者权限。
1bin/kafka-console-producer.sh --broker-list <Kafka集群IP:21007> --topic <Topic名称> --producer.config config/producer.properties
2#样例如下
3bin/kafka-console-producer.sh --broker-list 13.104.24.74:21007,13.104.24.75:21007,13.104.24.76:21007 --topic zx_video --producer.config config/producer.properties
New Consumer API消费数据,需要拥有该Topic的消费者权限
1bin/kafka-console-consumer.sh --topic <Topic名称> --bootstrap-server <Kafka集群IP:21007> --new-consumer --consumer.config config/consumer.properties
2#样例如下:
3bin/kafka-console-consumer.sh --topic topic_zizhu_gsm --bootstrap-server 13.104.24.74:21007,13.104.24.75:21007,13.104.24.76:21007 --new-consumer --consumer.config config/consumer.properties
赋Consumer权限命令,由管理员用户操作。
1bin/kafka-acls.sh --authorizer-properties zookeeper.connect=<ZooKeeper集群IP:24002/kafka > --add --allow-principal User:<用户名> --consumer --topic <Topic名称> --group <消费者组名称>
赋Producer权限命令,由管理员用户操作
1bin/kafka-acls.sh --authorizer-properties zookeeper.connect=<ZooKeeper集群IP:24002/kafka > --add --allow-principal User:<用户名> --producer --topic <Topic名称>