Kafka运维神器

在Kafka集群运维过程中,我们通常会借用一些开源工具来完成kafka的日常运维需求和相关问题排查,接下来介绍几个常用的kafka运维神器。

kafka-manager

由雅虎开源的kafka集群管理工具,不过现在已经改名为CMAK了(说明kafka的运维痛点还是蛮多的,项目还可以做更多的事情),核心功能如下:

  • 多集群管理
  • 集群瞬时监控(topics, brokers, 副本分布,分区分布等)
  • 优先副本选举
  • 主题,分区,副本管理(主题创建和参数调整,副本分配等)
  • Broker和Topic级别的metrics监控
  • 消费者基本信息管理
1
2
3
4
5
6
7
# 源码编译
$ ./sbt clean dist

# docker 安装
$ docker run -itd --name kafka-manager -e KAFKA_ZK_CLUSTER="zk-1:2181,zk-2:2181" -P      bgbiao/kafka-manager:2.0.0.2 
# 自定义端口
$ docker run -itd --name kafka-manager -e KAFKA_ZK_CLUSTER="zk-1:2181,zk-2:2181" -P      bgbiao/kafka-manager:2.0.0.2 -Dhttp.port=8080 

kafka-cluster

kafka-topic

kafka-topic-info

kafka-consumer

kafka-consumer-info

kafka-broker

kafka-broker-info

kafkacat

kafkacat是一个非JVM的命令行生产者和消费者,可以快速的对kafka消息进行生产和消费。

1
2
3
4
5
6
7
# mac 安装
$ brew install kafkacat 

# docker 安装
# 官方仅支持Debian和openSUSE系列
# 如下镜像支持在CentOS系列的环境中使用
$ docker pull bgbiao/kafkacat 
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# 消费者模式
$ kafkacat -b localhost:9092 -t mysql_users
% Auto-selecting Consumer mode (use -P or -C to override)
{"uid":1,"name":"Cliff","locale":"en_US","address_city":"St Louis","elite":"P"}
{"uid":2,"name":"Nick","locale":"en_US","address_city":"Palo Alto","elite":"G"}
[...]

# 生产者模式
# 默认是以换行符来决定一条消息(-D 可以指定分隔符)
# 默认从stdin来读取消息
$ kafkacat -b localhost:9092 -t new_topic -P
test

# 从文件中读取消息
# -l参数用来指定将文件中的每一个行作为一个消息进行发送,如果没有该参数,整个文本将会以一个消息发送(对于二进制的数据比较
有用)
# -T 参数可以用于回显到stdout
$ kafkacat -b localhost:9092 -t <my_topic> -T -P -l /tmp/msgs


# 可以指定消息的key,使用-K参数
$ kafkacat -b localhost:9092 -t keyed_topic -P -K:
1:foo
2:bar

$ kafkacat -b localhost:9092 -t keyed_topic -C -f 'Key: %k\nValue: %s\n'
Key: 1
Value: foo
Key: 2
Value: bar

# 设置partition
$ kafkacat -b localhost:9092 -t partitioned_topic -P -K: -p 1
1:foo

$ kafkacat -b localhost:9092 -t partitioned_topic -P -K: -p 2
2:bar

$ kafkacat -b localhost:9092 -t partitioned_topic -P -K: -p 3
3:wibble


# 元数据监听模式
# 元数据的监听模式可以使用`-L`参数,会显示当前kafka集群的状态以及主题,分区,副本,以及ISR信息。
# `-J`参数会以json格式进行输出
$ kafkacat -b localhost:9092 -L

$ kafkacat -b mybroker -L -J



# docker方式简单使用
# 从标准输入生产数据
$ docker run --rm bgbiao/kafkacat kafkacat -b kafka-broker:9092 -t test-biao -P <<EOF
> ssabdkgjf
> asfgnh
> wertnh
> waer
> awegrtn
> 2020-04-26
> end time
> EOF

# 从指定topic接收数据
$ docker run -it --rm bgbiao/kafkacat kafkacat -b kafka-broker:9092 -t test-biao -C -f '\nKey (%K bytes): %k\t\nValue (%S bytes): %s\n\Partition: %p\tOffset: %o\n--\n'

Key (-1 bytes):
Value (13 bytes): test kafkacat
Partition: 0	Offset: 11
--
% Reached end of topic test-biao [0] at offset 12

Key (-1 bytes):
Value (18 bytes): 2020-04-26 endtime
Partition: 1	Offset: 8
--
% Reached end of topic test-biao [1] at offset 9

Key (-1 bytes):
Value (7 bytes): overlay
Partition: 2	Offset: 6
--
% Reached end of topic test-biao [2] at offset 7

注意: 如果想要把docker镜像中的kafkacat二进制文件拷贝出来使用,仅需要在当前环境中安装librdkafka-devel即可。

gokafka

gokafka也是一个非JVM的kafka运维管理工具,不仅可以进行简单的消息模拟生产,而且可以对指定topic进行消息预览,同时也支持kafka集群的常见的运维管理操作.

快速开始

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
$ git clone https://github.com/goops-top/gokafka.git
$ cd gokafka
$ go build -o build/goops-kafka

$ ./build/goops-kafka
goops-kafka: A kafka tools with golang that can operate the kafka for describe,create,update and so on.

 Note: Without the jvm,so you must be specify the [--broker or --cluster and the Value must be in --config entry.]

Usage:
  gokafka [command]

Available Commands:
  consumer    consumer  a topic message data with specified kafka-cluster.
  create      create the kafka topic with some base params in specify kafka-cluster.
  describe    describe the kafka some info (cluster,topic,broker,loginfo)
  help        Help about any command
  init        init the gokafka some default config.
  list        list the kafka some info (cluster,topic,broker)
  producer    producer  a topic message data with specified kafka-cluster.
  version     print version info of goops-kafka tool

Flags:
      --broker string    指定broker地址
      --cluster string   指定集群
      --config string    指定配置文件(default is $HOME/.goops-kafka)
  -h, --help             help for gokafka

Use "gokafka [command] --help" for more information about a command.

# 使用make编译
# 会自动生成mac和linux发行版本的二进制文件
$ make default

简单使用

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# 查看工具版本
$ ./build/gokafka.mac version
Version: 0.0.1
GitBranch: master
CommitId: 445f935
Build Date: 2020-06-26T18:49:48+0800
Go Version: go1.14
OS/Arch: darwin/amd64

# 初始化配置文件
# gokafka 采用配置文件的方式快速管理多个kafka集群
$ ./build/gokafka.mac  init
gokafka config init ok.

# 在用户家目录下生成集群配置文件
$ cat ~/.goops-kafka

app: gokafka
spec:
  clusters:
  - name: test-kafka
    version: V2_5_0_0
    brokers:
    - 10.0.0.1:9092
    - 10.0.0.2:9092
    - 10.0.0.3:9092
  - name: dev-kafka
    version: V1_0_0_0
    brokers:
    - 192.168.0.22:9092
    - 192.168.0.23:9092
    - 192.168.0.24:9092

# 也可以使用--config 来指定集群配置文件
$ ./build/gokafka.mac  --config ./kafka-cluster.yaml version
Version: 0.0.1
GitBranch: master
CommitId: 445f935
Build Date: 2020-06-26T18:53:25+0800
Go Version: go1.14
OS/Arch: darwin/amd64

# 查看配置文件中的集群详情
$ ./build/gokafka.mac  list cluster
cluster:test-kafka version:V2_5_0_0 connector_brokers:[10.0.0.1:9092]
cluster:dev-kafka version:V1_0_0_0 connector_brokers:[192.168.0.22:9092]

./build/gokafka.mac  list cluster --config ./kafka-cluster.yaml
cluster:log-kafka version:V2_5_0_0 connector_brokers:[log-kafka-1.bgbiao.cn:9092]

# 注意: 当使用集群配置文件管理集群时,需要使用--cluster全局参数指定操作的目标集群
# 当然也可以直接指定broker进行操作
$ ./build/gokafka.mac  --cluster dev-kafka describe broker
controller: 3
brokers num: 3
broker list: [2 1 3]
id:2		 broker:192.168.0.22:9092
id:1		 broker:192.168.0.23:9092
id:3		 broker:192.168.0.24:9092

$ ./build/gokafka.mac  --broker 10.0.0.1:9092  describe broker
controller: 3
brokers num: 3
broker list: [2 1 3]
id:2		 broker:192.168.0.22:9092
id:1		 broker:192.168.0.23:9092
id:3		 broker:192.168.0.24:9092

常用功能

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# 1.创建topic
# 可以指定分区和副本数
$ ./build/gokafka.mac  --cluster dev-kafka create --topic test-bgbiao-1
true


# 2.查看topic
$ ./build/gokafka.mac  --cluster dev-kafka list topic --topic-list test-bgbiao-1
Topic:test-bgbiao-1	PartNum:3	Replicas:3	Config:
Topic-Part:test-bgbiao-1-2	ReplicaAssign:[1 3 2]
Topic-Part:test-bgbiao-1-1	ReplicaAssign:[2 1 3]
Topic-Part:test-bgbiao-1-0	ReplicaAssign:[3 2 1]

# 3.获取topic的分区和副本状态
$ ./build/gokafka.mac  --cluster dev-kafka describe  topic --topic-list test-bgbiao-1
Topic-Part:test-bgbiao-1-2	Leader:1	Replicas:[1 3 2]	ISR:[1 3 2]	OfflineRep:[]
Topic-Part:test-bgbiao-1-1	Leader:2	Replicas:[2 1 3]	ISR:[2 1 3]	OfflineRep:[]
Topic-Part:test-bgbiao-1-0	Leader:3	Replicas:[3 2 1]	ISR:[3 2 1]	OfflineRep:[]

# 4.向topic中生产消息
$ ./build/gokafka.mac  --cluster dev-kafka producer --topic test-bgbiao-1 --msg "Hello, BGBiao."
INFO[0000] Produce msg:Hello, BGBiao. to topic:test-bgbiao-1
INFO[0000] topic:test-bgbiao-1 send ok with offset:0

$ ./build/gokafka.mac  --cluster dev-kafka producer --topic test-bgbiao-1 --msg "Nice to meet you."
INFO[0000] Produce msg:Nice to meet you. to topic:test-bgbiao-1
INFO[0000] topic:test-bgbiao-1 send ok with offset:0

# 5.消费消息(gokafka中创建了一个默认的消费组用来预览消息)
# 在终端进行实时消费,ctrl+c 可取消
$ ./build/gokafka.mac  --cluster dev-kafka consumer --topic test-bgbiao-1
INFO[0000] Sarama consumer up and running!...
INFO[0004] part:2 offset:0
msg: Nice to meet you.
INFO[0004] part:1 offset:0
msg: Hello, BGBiao.

# 6.查看集群的broker列表以及controller
$ ./build/gokafka.mac  --cluster dev-kafka describe broker
controller: 3
brokers num: 3
broker list: [2 1 3]
id:2		 broker:192.168.0.23:9092
id:1		 broker:192.168.0.22:9092
id:3		 broker:192.168.0.24:9092

# 7.查看topic的日志大小
$ ./build/gokafka.mac  --cluster dev-kafka describe loginfo --topic-list test-bgbiao-1
topic:test-bgbiao-1
172.16.32.23:9092
logdir:/soul/data/kafka/kafka-logs
topic-part		log-size(M)		offset-lag
----------		-----------		----------
test-bgbiao-1-0		0		0
test-bgbiao-1-1		0		0
test-bgbiao-1-2		0		0
172.16.32.22:9092
logdir:/soul/data/kafka/kafka-logs
topic-part		log-size(M)		offset-lag
----------		-----------		----------
test-bgbiao-1-0		0		0
test-bgbiao-1-1		0		0
test-bgbiao-1-2		0		0
172.16.32.24:9092
logdir:/soul/data/kafka/kafka-logs
topic-part		log-size(M)		offset-lag
----------		-----------		----------
test-bgbiao-1-0		0		0
test-bgbiao-1-1		0		0
test-bgbiao-1-2		0		0

# 8.列出kafka集群的消费者组
$ ./build/gokafka.mac  --cluster dev-kafka list  consumer-g | grep -i gokafka
GoKafka

# 9.查看消费者组详细信息
# 因为上面我们停止了消费,该消费者组当前是空的
$ ./build/gokafka.mac  --cluster dev-kafka describe consumer-g --consumer-group-list GoKafka
--------------------------------------------------------------------------------------------
consumer-group:GoKafka consumer-state:Empty

# 我们来看一个有实际消费状态的消费者组
# 可以看到一个消费者组下的全部消费者线程以及消费者实例,消费的对应的topic列表
$ ./build/gokafka.mac  --cluster dev-kafka describe consumer-g --consumer-group-list group-sync
--------------------------------------------------------------------------------------------
consumer-group:group-sync consumer-state:Stable
consumer-id					consumer-ip			topic-list
consumer-1-a9437739-e5cb-4b41-a9d3-2640b9878965	/172.16.64.207		[sync-dev]
consumer-1-92eb690b-d327-468b-8990-9c21e1ee405d	/172.16.32.235		[sync-dev]


# 10. 还可以查看某个消费者组消费某个topic的日志详情
$ ./build/gokafka.mac  --cluster dev-kafka describe consumer-group-offset --group group-sync --topic sync-dev
sync-dev
topic-part:sync-dev-0 log-offsize:98
topic-part:sync-dev-1 log-offsize:0
topic-part:sync-dev-2 log-offsize:340
topic-part:sync-dev-3 log-offsize:261

公众号