kafka命令总结

topic的相关操作

创建topic

  • 创建一个叫做“test”的topic,它只有二个分区,一个副本

  • bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic test

    名词 解释
    --zookeeper 后面加zookeeper的地址
    --replication-factor 副本数
    --partitions 分区数
    --topic topic的名字

删除topic

  • bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic 【topic name】

修改topic

  • bin/kafka-topics.sh --zookeeper localhost:2181 --alter --partitions 20 --topic test
  • 修改topic为test的分区为20

查看topic

  • bin/kafka-topics.sh --list --zookeeper localhost:2181

  • 查看集群中 topic的数量

查看某个topic的详细内容

  • bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

消费数据

消费数据相关

  • bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test <--from-beginning>
    --from-beginning 从头开始
  • bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --describe --group cjpt
  • bin/kafka-consumer-groups.sh --group cjpt-test-htga --describe --zookeeper localhost:2181
  • 2.2版本后的
  • bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.64:9092 --from-beginning --topic test
  • 查看分区情况
  • bin/kafka-run-class.sh kafka.tools.GetOffsetShell --topic test_topic --time -1 --broker-list localhost:9092 --partitions 5
  • 从头开始消费
  • bin/kafka-console-consumer.sh --zookeeper 10.4.2.66:2181 --from-beginning --topic test --consumer.config config/consumer.properties

查看消费情况

  • bin/kafka-consumer-groups.sh --group testGroup --describe --bootstrap-server localhost:9092
  • 查看testGroup的消费情况的
  • bin/kafka-consumer-groups.sh --zookeeper 10.4.2.66:2181 --describe --group test-consumer-group

查看数据的生产情况

  • bin/kafka-console-producer.sh --broker-list docker-master:9092,docker-slave01:9092,docker-slave02:9092 --topic topic_test

生产数据

  • bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

kafka性能测试

生产性能测试

bin/kafka-producer-perf-test.sh --producer-props bootstrap.servers=192.168.1.63:9092,192.168.1.64:9092,192.168.1.65:9092 --record-size 102400 --num-records 10000 --throughput -1 --topic test

消费性能测试

bin/kafka-consumer-perf-test.sh --zookeeper 192.168.1.63:2181,192.168.1.64:2181,192.168.1.65:2181 --broker-list 192.168.1.63:9092,192.168.1.64:9092,192.168.1.65:9092 --fetch-size 1024 --messages 100000 --topic test --threads 3 --group producer_consumer

kafka权限

赋Consumer权限命令

1bin/kafka-acls.sh --authorizer-properties zookeeper.connect=<ZooKeeper集群IP:24002/kafka > --add --allow-principal User:<用户名> --consumer --topic <Topic名称> --group <消费者组名称>

赋Producer权限命令,由管理员用户操作。

1bin/kafka-acls.sh --authorizer-properties zookeeper.connect=<ZooKeeper集群IP:24002/kafka > --add --allow-principal User:<用户名> --producer --topic <Topic名称>

分区计划

 1修改topic的备份数量
 2//校验是否正确
 3bin/kafka-reassign-partitions.sh --zookeeper localhost:2181  --reassignment-json-file ./topic-test.json  --verify
 4
 5输出信息:修改之前报这个信息,说备份数量和文本不一致(因为文本内容是要添加副本数,肯定不一致),证明文本没有问题,可以执行
 6
 7[root@cjmaster01 kafka_2.11-0.9.0.1]# bin/kafka-reassign-partitions.sh --zookeeper localhost:2181  --reassignment-json-file ./topic-test.json  --verify
 8Status of partition reassignment:
 9ERROR: Assigned replicas (0) don't match the list of replicas for reassignment (0,1) for partition [topic-test,1]
10ERROR: Assigned replicas (2) don't match the list of replicas for reassignment (1,2) for partition [topic-test,3]
11ERROR: Assigned replicas (2) don't match the list of replicas for reassignment (2,1) for partition [topic-test,0]
12ERROR: Assigned replicas (1) don't match the list of replicas for reassignment (1,2) for partition [topic-test,2]
13ERROR: Assigned replicas (1) don't match the list of replicas for reassignment (1,0) for partition [topic-test,5]
14ERROR: Assigned replicas (0) don't match the list of replicas for reassignment (1,0) for partition [topic-test,4]
15Reassignment of partition [topic-test,1] failed
16Reassignment of partition [topic-test,3] failed
17Reassignment of partition [topic-test,0] failed
18Reassignment of partition [topic-test,2] failed
19Reassignment of partition [topic-test,5] failed
20Reassignment of partition [topic-test,4] failed
21
22//上边返回成功了再执行
23bin/kafka-reassign-partitions.sh --zookeeper localhost:2181  --reassignment-json-file ./topic-test.json  --execute
24
25topic-test.json文件的格式
26{
27        "partitions": [{
28                        "topic": "topic-test"
29                        "partition": 0,
30                        "replicas": [2, 1]
31                },
32                {
33                        "topic": "topic-test",
34                        "partition": 1,
35                        "replicas": [0, 1]
36                }
37	}
38  ],
39  "version": 1
40
41}
42

kafka 相关例子

1bin/kafka-console-producer.sh \
2--broker-list master:9092,slave01:9092,slave02:9092 \
3--topic test
4
5
6

kafka 集群的注意点

获取到 Kafka 集群的 zookeeper.connect 信息。

 1通过读取 config/server.properties 文件,获取到 Kafka 集群的 zookeeper.connect 信息。
 2为简化后续步骤的信息输入,这里 创建一个临时变量 zkUrls 保存这一信息,
 3例如:
 43. cat config/server.properties | grep  "^zookeeper.connect " 然后对 zkUrls 进行赋值:
 54. zkUrls=" 192.168.115.11:24002,192.168.115.12:24002,192.168.115.3:24002/kafka"
 65. 创建一个 Topic,名为 testTopic: bin/kafka-topics.sh --zookeeper $zkUrls
 7--create  --topic  testTopic  --partitions 1 --replication-factor 2
 86. 查询集群中的 Topic 信息:
 9bin/kafka-topics.sh --zookeeper $zkUrls --list bin/kafka-topics.sh --zookeeper $zkUrls --describe --topic testTopic
107. 删除 5 中创建的Topic:
11bin/kafka-topics.sh --zookeeper $zkUrls  --delete --topic  testTopic
128. 再次查询集群中的 Topic 信息:
13bin/kafka-topics.sh --zookeeper $zkUrls --list bin/kafka-topics.sh --zookeeper $zkUrls --describe --topic testTopic

华为云客户端kafka相关命令

kerberos验证

  • 进入安装好的kafka客户端

    1cd /htga/client
    
  • 执行以下命令配置环境变量

    1source bigdata_env 
    
  • 执行以下命令,进行用户认证

    1kinit 组件业务用户
    2kinit cjpt
    
  • 输入密码

    1SNI!@ffer34
    
  • 执行以下命令切换到Kafka客户端安装目录

    1cd Kafka/kafka/ 
    

kafka相关命令

查看当前集群的topic列表

1bin/kafka-topics.sh --list --zookeeper <ZooKeeper集群IP:24002/kafka>
2#样例如下:
3bin/kafka-topics.sh --list --zookeeper 13.104.24.63:24002,13.104.24.64:24002,13.104.24.65:24002/kafka

查看单个Topic详细信息。

1bin/kafka-topics.sh --describe --zookeeper <ZooKeeper集群IP:24002/kafka> --topic <Topic名称>
2#样例如下
3bin/kafka-topics.sh --describe --zookeeper 13.104.24.63:24002,13.104.24.64:24002,13.104.24.65:24002/kafka --topic topic_zizhu_gsm

创建Topic,由管理员用户操作

1bin/kafka-topics.sh --create --zookeeper <ZooKeeper集群IP:24002/kafka> --partitions 6 --replication-factor 3 --topic <Topic名称>
2#样例如下
3bin/kafka-topics.sh --create --zookeeper 13.104.24.63:24002,13.104.24.64:24002,13.104.24.65:24002/kafka --partitions 6 --replication-factor 3 --topic topic_test

删除Topic,由管理员用户操作。

1bin/kafka-topics.sh --delete --zookeeper <ZooKeeper集群IP:24002/kafka> --topic <Topic名称>
2#样例如下
3bin/kafka-topics.sh --delete --zookeeper 13.104.24.63:24002,13.104.24.64:24002,13.104.24.65:24002/kafka --topic topic_test

New Producer API生产消息,需要拥有该Topic生产者权限。

1bin/kafka-console-producer.sh --broker-list <Kafka集群IP:21007> --topic <Topic名称> --producer.config config/producer.properties 
2#样例如下
3bin/kafka-console-producer.sh --broker-list 13.104.24.74:21007,13.104.24.75:21007,13.104.24.76:21007 --topic zx_video --producer.config config/producer.properties 

New Consumer API消费数据,需要拥有该Topic的消费者权限

1bin/kafka-console-consumer.sh --topic <Topic名称> --bootstrap-server <Kafka集群IP:21007> --new-consumer --consumer.config config/consumer.properties
2#样例如下:
3bin/kafka-console-consumer.sh --topic topic_zizhu_gsm --bootstrap-server 13.104.24.74:21007,13.104.24.75:21007,13.104.24.76:21007  --new-consumer --consumer.config config/consumer.properties

赋Consumer权限命令,由管理员用户操作。

1bin/kafka-acls.sh --authorizer-properties zookeeper.connect=<ZooKeeper集群IP:24002/kafka > --add --allow-principal User:<用户名> --consumer --topic <Topic名称> --group <消费者组名称>

赋Producer权限命令,由管理员用户操作

1bin/kafka-acls.sh --authorizer-properties zookeeper.connect=<ZooKeeper集群IP:24002/kafka > --add --allow-principal User:<用户名> --producer --topic <Topic名称>