背景: 如同其他分布式系统一样,在kafka集群中,单Topic的partition也并不是越多越好,但通常对于业务方来说,可能会简单的根据生产者或消费者的处理能力来提出扩partition的需求,此时就需要根据具体的场景进行分析以确定partition的数量。

对于Kafka集群承载的业务Topic来说,分区的数量,可以体现出整个业务的量级同时能够尽可能的提供更高的吞吐,但并不是越多的分区就意味着越高的吞吐和处理能力,通常情况下需要业务方和基础服务方一起来进行分析。

以下为多分区Topic的优缺点,可以适当根据需求和场景进行规划分区数量。

目录

[TOC]

多分区可以提高吞吐(Higher Throughput)

首先,我们需要理解的是,在kafka集群中,topic分区partition是一个并行单位。

在生产者和broker端,可以完全并行完成对不同分区的写入操作。一些昂贵的操作,比如压缩(compression),需要消耗更多的硬件资源。

在消费者端,kafka总是会给每个消费者线程一个单分区的数据,因此当分区的数量其实也就限制了消费者线程的最大数据量,如果消费者不足以在最短时间消费数据,可以通过优化客户端程序或增加partition数量的方式(同时增加消费者线程)来缓解,但后者需要topic级别的调整,同时需要确认生产者是否有分区绑定相关操作。

因此,一般而言,kafka集群的拥有越多的分区也意味着可以获取更高的吞吐。

选择分区的数量一般可用基于吞吐来进行简单计算。

您可以在单个分区上对生产(称为p)和消费(称为c)所能实现的所有性能进行度量。假如我们的目标吞吐是t,然后我们可以设置分区数量为max(t/p, t/c)

在生产者端,可以通过设置batch size,compression,ack,replication factor等参数来计算单个分区的吞吐。 不过如LinkedIn-Benchmarking中显示,通常单个分区可以承载每秒10 Million调数据的写入。

在消费者端吞吐量通常依赖于应用程序,因为它对应于使用者逻辑处理每个消息的速度,因此得实际进行测量。

尽管在topic创建后期,可以根据需求进行扩容分区,但是对于有keys的消息需要注意。

因为当生产者写入一个带key的消息后,kafka将依据该key的hash来决定数据映射到具体的分区,这样就可以保证具有相同key的消息会被路由到同一个分区,从而保证了一类消息的有序性。

如果此时分区数量突然增加,将可能导致数据的无序性,为了避免未来这种问题的出现,一般而言会在业务接入初期进行分区的过度分配over-partition,即在topic创建时尽量多创建一些,但基本上在接入之前需要预估未来的吞吐量以确定一个合理的分区数量,这样即使当前集群规模较小,可以对topic进行提前规划,待后期集群规模大之后进行迁移即可,否则刚开始因为集群规模小,而对分区数量没有合理规划,后期会比较麻烦,而且这样当生产者使用带key的消息时,您可以跟上吞吐量的增长,而不会破坏应用程序中的语义。

除了吞吐量之外,在规划分区数时还需要考虑一些其他的因素,因为在吞吐增加的同时可能会增加一些其他影响。

需要更多的Open File Handles

每个分区会映射到broker机器的其中一个目录,在每个日志目录每个日志段(segment)包含四个文件:

  • 00000000000029487197.index: 索引文件
  • 00000000000029487197.log: 真正的日志文件
  • 00000000000029487197.snapshot: 快照
  • 00000000000029487197.timeindex: 时间索引

在kafka中,broker会为每个segment打开一个file handle ,不过这仅是一个配置问题,通常在生产集群可以配置OS层面的打开文件数为30 thousand

多分区可能增加整体不可用性

整体可用性的降低是因为在异常时有更多的分区需要进行恢复数据,此时多分区之间的数据同步可能在网络上需要消耗。

kafka集群内部的副本机制,可以提供比较高的可用性和持久性。一个分区如果有多个副本,每个副本将会存储到不同的broker上,副本被设计为leader和follow副本。

在内部,Kafka自动管理所有这些副本,并确保它们保持同步,生产者和消费者对分区的请求都将由leader副本来响应,当broker宕机后,该节点上的leader副本将暂时不可用,kafka内部的controller角色将自动将分区的leader副本切换到其他满足条件的副本上,以尽快提供读写请求(是否能尽快切换到其他副本上取决于数据的一致性和可靠性要求)。

注意:leader副本的切换动作会涉及到controller从zookeeper中去获取对应分区的相关副本元数据,在controller内部,操作zookeeper是串行的。

在通常情况下,当一个Broker被优雅地关闭时,控制器会主动地将leader从关闭的代理中移开,每次只关闭一个,一个leader的切换通常仅会需要消耗几毫秒,因此,对于客户端而言,当使用优雅重启时,仅有一个很小的不可用窗口。

然而,当broker被异常关闭时(kill -9),观测到的不可用性可能与分区的数量成正比。假设每个broker共2000个分区,每个分区共2个副本,基本每个broker将负责1000个分区,当该broker异常关闭时,所有1000个分区将在同一时间不可用。假设为单个分区选主的时间需要消耗5ms,选举1000个分区将消耗5s。

因此,对于某些分区,其观察到的不可用性可能是5秒加上检测故障所需的时间。

如果不行的是,异常关闭的broker节点刚好是controller角色,在这种情况下,分区选leader的操作将不会立即进行,知道controller本身转移到正常的broker节点上。

controller控制器自动发生故障转移,但需要新控制器在初始化期间从ZooKeeper读取每个分区的一些元数据。

例如,如果Kafka集群中有10,000个分区,并且每个分区初始化来自ZooKeeper的元数据需要2 ms时间,这可能会给不可用窗口增加20秒。

一般来说,异常的关闭是罕见的。但是,如果关心在这些罕见情况下的可用性,那么最好将每个Broker的分区数量限制在2到4000个,集群中的分区总数限制在数万个以内。

可能增加端到端的延迟(End-to-end Latency)

Kafka中的端到端延迟是由生产者发布消息到消费者读取消息的时间定义的。

Kafka只在提交消息之后才向使用者公开消息(commited log),即当消息被复制到所有同步副本时。因此,提交消息的时间可能是端到端延迟的重要部分。

默认情况下,Kafka的broker节点仅使用一个线程来复制来自另一个Broker的数据,用于在两个Broker之间共享副本的所有分区。(可以设置适当大参数来调整同步线程数)

实验表明,将1000个分区从一个Broker复制到另一个Broker可以增加大约20 ms延迟,这意味着端到端延迟至少是20 ms,这对于某些实时计算类的应用来说,整体的延迟其实有点高,当然如果整个集群比较大时,分区分布在不同的broker上,该问题可以适当缓解(其余的broker可以从这一个broker上平均fetch消息)

因此,由于提交消息而增加的延迟将只有几ms,而不是几十ms。

如果关系延迟指标,可以参照如下方式来计算分区:

将每个Broker的分区数量限制在100 * b * r可能是一个好主意,其中b是Kafka集群中的Broker数量,r是复制因子。

更多的分区可能在客户端需要更多的内存

在客户端在内部,生产者缓冲每个分区的消息。在积累了足够的数据或经过了足够的时间之后,将从缓冲区中删除累积的消息并将其发送给Broker。

如果增加分区的数量,消息将在客户端区域中的更多分区中累积。所使用的内存总量现在可能超过了配置的内存限制。

当发生这种情况时,生产者必须阻止或删除任何新消息,这两种方法都不理想。为了防止这种情况发生,需要重新配置生成器,使其具有更大的内存大小。

根据经验,要获得良好的吞吐量,应该为生产者中生成的每个分区分配至少几十KB的内存,如果分区数量显著增加,还应该调整内存总量。

消费者也存在类似的问题。消费者为每个分区获取一批消息。消费者使用的分区越多,所需的内存就越多。然而,这通常只是一个针对非实时用户的问题。

总结

通常,Kafka集群中的分区越多,吞吐量就越高。但是,必须意识到总分区或每个Broker拥有太多分区对可用性和延迟等方面的潜在影响,通常这部分需要业务方和基础服务方进行合理规划和调整。

相关文档


公众号