我们都知道,自从Kafka诞生之际,就一直使用Zookeeper服务来进行kafka集群的元数据和状态管理,虽然在KIP-500中有提议未来将移除Zookeeper的依赖,使用Raft协议来实现新的元数据和状态管理,但在这之前,我们仍然需要对kafka集群的整个元数据和状态有一定理解,才能更好的维护和保障kafka集群。

前言

在kafka集群中,ZooKeeper集群用于存放集群元数据成员管理Controller 选举,以及其他一些管理类任务

  • 存放元数据: 是指主题分区的所有数据都保存在 ZooKeeper 中,且以它保存的数据为权威,其他“人” 都要与它保持对齐。
  • 成员管理: 是指 Broker 节点的注册、注销以及属性变更。
  • Controller 选举: 是指选举集群 Controller,而其他管理类任务包括但不限于主题删除、参数配置。

Zookeeper服务

在开始之前,我们首先需要对Zookeeper服务有一定的了解。在官方文档中,Zookeeper有如下能力:

  • 配置管理: 其实就是单纯的K/V存储,可以用来配置的存储和管理,实现对分布式系统组件的集中式的配置管理
  • 命名: 命令空间管理,在zookeeper中配置可通过不同的根路径来实现简单的命名空间隔离(简单的chroot隔离)
  • 分布式的同步服务: 可保证分布式的同步
  • 分组服务: 可以将一组服务配置进行分组管理,其实在Dubbo的注册中心中,就是使用分组和命名来统一进行生产者的注册和发现

Zookeeper架构

在Zookeeper中所有的命名空间和文件系统比较类似,每个节点都有一个唯一的路径,如下图:

zookeeper内部存储结构

为什么Zookeeper服务可以满足配置管理和协调服务呢?主要是因为它的节点属性,通常也是经常可能会忽略的问题,即在Zookeeper中存在两种节点: 持久节点临时节点,节点也称之为znode,而znode本身除了数据之外,还会存储一些额外的信息:

  • 数据变更的版本号
  • ACL变更的版本号
  • 时间戳

每次znode中的数据有变更,版本号都会递增,并且client在向znode获取数据时,不仅会获取实际数据,而且会获取到数据的版本,这在整个配置管理领域会相当有用。

前面说到的持久节点只要znode创建后,便一直会存在,这种节点主要用于配置的持久化存储,只要zookeeper集群整体可用,那该节点也一直可用(及时集群异常后恢复,该节点依旧存在);而临时节点 随着某个创建 znode 的会话的开始,被创建,而一旦这个会话被撤除掉,这个 znode 就会自动被 ZooKeeper 删除。

对于临时节点来讲,经常会被用在如下场景:

  • dubbo中的服务注册
  • 其他分布式服务中的leader选举

而真正使得Zookeeper在Java语言生态中有如此大影响还依赖于另外一个功能: watches,Client 可以对某个 znode 设置watch 当这个 znode 有变更的时候,就会产生watch 事件,这个事件会由 ZooKeeper 通知至 Client, 即是 Client 会收到来自 Server 的通知,可以实现数据的动态变更。(之所以说是Java语言生态,是因为在云原生领域,Etcd几乎是Zookeeper的替代品,不同的是它是Golang语言生态体系中的)

kafka集群数据在zookeeper的存储

前面对Zookeeper服务进行了大概的说明,接下来我们一起看看kafka的集群数据在zookeeper中是如何存储和分布的。

链接zk:

1
2
3
4
5
6
7
# 前面说到zookeeper支持命名空间,因此我们可以使用一个zookeeper集群来管理多个kafka集群
# 只需要在启动kafka时指定zk的chroot环境,比如/kafka 在zookeeper中将存储名称为`kafka`的集群的相关数据
sh-4.2# sh zkCli.sh -server 127.0.0.1:2181
Connecting to 127.0.0.1:2181
....
[zk: 127.0.0.1:2181(CONNECTED) 0] ls /
[kafka, zookeeper]

查看kafka数据存储结构:

1
2
[zk: 127.0.0.1:2181(CONNECTED) 1] ls /kafka
[admin, brokers, cluster, config, consumers, controller, controller_epoch, isr_change_notification, latest_producer_id_block, log_dir_event_notification]

kafka元数据存储目录 - admin: 存储管理员接口操作的相关信息,主要为topic删除事件,分区迁移事件,优先副本选举,信息(一般为临时节点) - brokers: 主要存储broker相关的信息,broker节点以及节点上的topic相关信息 - cluster: 存储kafka集群信息 - config: 存储broker,client,topic,user以及changer相关的配置信息 - consumers: 存放消费者相关信息(一般为空) - controller: 用于存放控制节点信息(注意: 该节点是一个临时节点,用于controller节点注册) - controller_epoch: 用于存放controller节点当前的年龄 - isr_change_notification: 用于存储isr的变更通知(临时节点,当有isr进行变动时,会用于事件通知,可进行watch获取集群isr状态变更) - latest_producer_id_block: 该节点用于存储处理事务相关的pid范围 - log_dir_event_notification: 日志目录事件通知

admin目录结构

1
2
3
4
5
6
[zk: 127.0.0.1:2181(CONNECTED) 2] ls /kafka/admin
[delete_topics]
[zk: 127.0.0.1:2181(CONNECTED) 3] ls /kafka/admin/delete_topics
[]
[zk: 127.0.0.1:2181(CONNECTED) 4] get  /kafka/admin/delete_topics
null

brokers目录结构

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# broker和topic列表数据
[zk: 127.0.0.1:2181(CONNECTED) 5] ls /kafka/brokers
[ids, seqid, topics]

# 表示当前集群有3个节点
[zk: 127.0.0.1:2181(CONNECTED) 6] ls /kafka/brokers/ids
[1, 2, 3]
[zk: 127.0.0.1:2181(CONNECTED) 7] ls /kafka/brokers/seqid
[]
# 表示集群当前的topic信息
[zk: 127.0.0.1:2181(CONNECTED) 8] ls /kafka/brokers/topics
[__consumer_offsets, appjsonlog_heartbeat, nginx_log,ingress_log ]

# broker详情
[zk: 127.0.0.1:2181(CONNECTED) 10] get  /kafka/brokers/ids/1
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://10.0.0.1:9092"],"jmx_port":9999,"host":"10.0.0.1","timestamp":"1588925944886","port":9092,"version":4}

# topic详情
[zk: 127.0.0.1:2181(CONNECTED) 11] get  /kafka/brokers/topics/__consumer_offsets
{"version":2,"partitions":{"1":[2,1,3],"0":[3,2,1],"2":[1,2,3],"adding_replicas":{},"removing_replicas":{}}


[zk: localhost:2181(CONNECTED) 3] ls /brokers/topics/__consumer_offsets
[partitions]
[zk: localhost:2181(CONNECTED) 4] ls /brokers/topics/__consumer_offsets/partitions
[0, 1, 2]

# 查看topic某个分区的状态详情
[zk: localhost:2181(CONNECTED) 6] ls /brokers/topics/__consumer_offsets/partitions/1
[state]
[zk: localhost:2181(CONNECTED) 7] ls /brokers/topics/__consumer_offsets/partitions/1/state
[]
[zk: localhost:2181(CONNECTED) 8] get  /brokers/topics/__consumer_offsets/partitions/1/state
{"controller_epoch":3,"leader":1,"version":1,"leader_epoch":12,"isr":[3,1]}

cluster目录结构

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
[zk: 127.0.0.1:2181(CONNECTED) 5] get   /kafka/cluster/id
{"version":"1","id":"5C-JZf4vRdqKzlca7Lv7pA"}
cZxid = 0x100000018
ctime = Fri Mar 30 22:21:07 CST 2018
mZxid = 0x100000018
mtime = Fri Mar 30 22:21:07 CST 2018
pZxid = 0x100000018
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 45
numChildren = 0

config目录结构

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
[zk: 127.0.0.1:2181(CONNECTED) 12] ls /kafka/config
[brokers, changes, clients, topics, users]

# 需要注意的是,config目录用来存放各种实体的配置,用于使用kafka相关工具对实体进行的配置变更存储
# 因此,一般在kafka集群运行后,如不设置相关动态参数,该目录下的配置一般为空
[zk: 127.0.0.1:2181(CONNECTED) 19] ls /kafka/config/brokers
[]
[zk: 127.0.0.1:2181(CONNECTED) 20] ls /kafka/config/changes
[]
[zk: 127.0.0.1:2181(CONNECTED) 21] ls /kafka/config/clients
[]
[zk: 127.0.0.1:2181(CONNECTED) 22] ls /kafka/config/topics
[__consumer_offsets]
[zk: 127.0.0.1:2181(CONNECTED) 23] ls /kafka/config/users
[]

# 可以看到,存储消费者偏移量的topic在配置中
# 是因为该topic有一些额外的topic级别参数
# 如果我们对topic参数有过动态变更,将会在这里存储
[zk: 127.0.0.1:2181(CONNECTED) 24] get /kafka/config/topics/__consumer_offsets
{"version":1,"config":{"segment.bytes":"104857600","compression.type":"producer","cleanup.policy":"compact"}}

controller目录结构

1
2
3
# 可以看到当前broker-1为集群的controller节点
[zk: 127.0.0.1:2181(CONNECTED) 30] get  /kafka/controller
{"version":1,"brokerid":1,"timestamp":"1588833869354"}

controller_epoch 目录结构

1
2
3
# 可以看到controller的年龄是2,说明controller经历过2次变更了
[zk: 127.0.0.1:2181(CONNECTED) 32] get  /kafka/controller_epoch
2

其他几个目录结构

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
# 存储事务相关的pid范围数据
# broker启动时提前预分配一段 PID,当前是 0~999,即提前分配出 1000 个 PID 来
# 一旦 PID 超过了 999,则目前会按照 1000 的步长重新分配
# 集群中所有 broker 启动时都会启动一个叫 TransactionCoordinator 的组件,该组件能够执行预分配 PID 块和分配 PID 的工作,而所有 broker 都使用 /latest_producer_id_block 节点来保存 PID 块

[zk: 127.0.0.1:2181(CONNECTED) 37] get  /kafka/latest_producer_id_block
{"version":1,"broker":1,"block_start":"19000","block_end":"19999"}


[zk: 127.0.0.1:2181(CONNECTED) 39] ls /kafka/isr_change_notification
[]
[zk: 127.0.0.1:2181(CONNECTED) 40] get  /kafka/isr_change_notification
null

[zk: 127.0.0.1:2181(CONNECTED) 41] ls /kafka/log_dir_event_notification
[]
[zk: 127.0.0.1:2181(CONNECTED) 42] get  /kafka/log_dir_event_notification
null

以上就是整个kafka元数据在zookeeper集群中的整体存储结构,下面附上一张kafka的元数据存储结构图。

kafka元数据在zk中的分布

kafka在zookeeper中的存储结构


公众号