一文读懂kafka集群规划和调优

前言: 通常对于初创企业或者初创业务团队来说,对于开源组件的使用都会显得比较随意,而这种情况会随着业务量级的增长和时间的推移导致开源服务的滥用和乱用,进而造成的结果就是整体业务的稳定性和可靠性相对较差,而此时,扩容的行为是边际效应递减的,如果此时不对整体业务以及开源服务进行规划和改造,那么风险和成本将是同比增长的,因此在开始阶段,需要对开源组件,特别是分布式组件进行场景规划和调优。


一文读懂kafka集群规划和调优


在我过去的工作经历中,经历过类似服务的有 Redis集群 , ElasticSearch 集群,虽然整体改造后并不一定将成本降到最低,但是可以将服务的可用性和可靠性提高很多,而且根据业务场景以及使用方式来规划集群后会使得整体的边际成本呈递减状态。

笔者目前所处的团队所管理的 kafka集群 也处于该种状态,集群当前规模大约为20+ECS,总数据量大约400T,其中承接的Topic服务主要分为 日志收集 、 数据管道 、 流式计算 、 业务事件存储 几种大场景,我们需要知道,以上几种使用场景对于kafka集群的的 可用性 , 可靠性 , 数据一致性 要求其实是不同的,如果将所有场景耦合到同一个集群,在数据量较大的情况下,任何的小异常点都可能造成整体服务受到影响,并且整个集群的恢复周期会很长,如果业务没有及时的降级策略很可能影响核心业务的处理。

鉴于以前对开源分布式服务的规划和改造经验,本篇文章将根据官方文档以及经验来分享一些关于Kafka生产集群规划和运维管理相关的知识.

一、Kafka集群运维和规划

其实任何开源的分布式系统在开始规划时,就需要考虑到业务场景,以及生产环境的周边可观测系统,比如如下几个方面:

  • 规划和部署生产级别的集群(包含官方最佳实践以及一些针对不停场景推荐的配置项变更)
  • 执行一些部署后的操作(比如滚动重启集群,集群备份,数据迁移等等)
  • 集群的可观测性(监控重要统计数据,理解kafka内部行为的具体含义以及是否需要报警通知)

二、集群规划

本节主要介绍,kafka集群在生产环境部署前的一些规划,包含硬件配置选择,网络以及文件系统和其他考虑的选型.

1.硬件和OS

注意: 通常对于分布式的开源服务来将对于硬件本身没有太高的要求,但当需要承载一定量级的业务时,我们需要考虑一些硬件是否能够支撑对应的业务场景,并且通常来讲针对不同的业务场景选择不同的硬件(如果可选择),也许会适当降低资源成本。

1.0 OS

一般来说,对于运行Linux中的kafka集群不需要过多的OS以及kernel参数调整,但如下几种情况可以根据具体情况进行参考:

  • 文件描述符(fd) : broker节点上fd限制可以参考 (number_of_partitions)*(partition_size/segment_size) 公式
  • 套接字缓冲区(socket buffer) : 该参数可以增加多数据中心之间的数据传输(一般异地集群备份建议调整以增加吞吐)
  • 最大内存映射区域数(vm.max_map_count) index/timeindex OutOfMemoryError

注意: 每个分区的日志段的数量取决于段(segment)的大小,负载,以及保留策略

kafka使用大量的文件以及socket和客户端进行通讯,我们都知道,在Linux下,一切皆文件,因此系统需要设置更多的可用文件描述符>。

在大多数的默认系统配置中,单个进程可用使用1024个文件描述符,对于kafka来说太小了,建议调整到至少 100000 ,但是通常和操作 系统以及发行版本有关系,需要根据具体的OS进行调整。

可用通过计算Kafka数据目录中的 .index 文件来计算当前的mmap编号。 .index 文件大多数代表了内存映射文件。

<code> 
$ find . -name 

'*index'

| wc -l

/<code>

2.为每个session设置vm.max_map_count参数,这将计算当前内存映射文件的数量,mmap限制的最小值就是打开文件的ulimit限制

该值要远大于index的数量

$ sysctl -w vm.max_map_count=262144

3.持久化mmap参数

sysctl -p

1.1 内存

Kafka严重依赖文件系统来存储和缓存消息。

所有数据都会立即写入文件系统上的持久日志中,而不必刷新到磁盘。实际上,这仅意味着将其转移到内核的页面缓存 pagecache 中。 当回收内存时,操作系统会将可用内存转移到磁盘缓存中,而对性能的影响很小。

同时,Kafka非常小心地使用堆空间 heap space ,不需要将堆大小设置为超过6 GB,这样将会在32G内存的机器上缓存28-30G的数据到文件系统。

因此,生产集群需要足够的内存来缓存活动的reader和writer。在Confluent的使用建议中,提出了对内存需求的粗略估计方式,比如需要缓冲30s,那么内存需求大概为 write_throughput * 30 。

通常来讲 64G 内存配置的机器是一个不错的选择.

1.2 CPU

大多数的kafka集群对于cpu的要求不是那么高,因此对于CPU的配置没有像其他资源那么重要(但是通常同等资源都是由一定比例配比的)。

注意: 如果开启了SSL,那么可能对集群的整体性能有一定影响,且对cpu有一定要求,具体需要考虑到cpu的类型以及具体的JVM实现细节(通常来讲内部服务均不会开启SSL,因为管理成本很高,且性能上略有损失,安全上可以通过整体的IT安全进行要求和管控)

通常情况下建议使用较新的多核处理器,通用集群可以设置为 24 核心。

如果需要在更快的CPU或更多的内核之间进行选择,请选择更多的内核,因为多核提供的额外并发性将远远超过稍快的时钟速度。

1.3 Disk

生产集群建议使用多块磁盘来最大化整体的吞吐,不要与应用程序日志或其他OS文件系统活动共享用于Kafka数据的相同驱动器,以确保良好的延迟。

在官方的最佳实践中建议,可以将 多块磁盘构建成RAID ,或者直接将独立的 多块磁盘 作为kafka的数据存储,也就是JBOD方案(Just Bunch Of Disks)。

备注: 如果软RAID的话其实会在存储方面增加一层数据均衡,增加了集群的复杂度,因此一般可用选择后者,而且RAID主要用于提供冗余,对于开源分布式服务来讲,在软件层面上基本都会保证数据的冗余。

不过在实际的场景中,具体选择使用多块盘做RAID还是直接使用多块盘挂载,以下有几种场景可以考虑:

如果配置多个数据目录,则Broker将在路径中放置一个新分区,该分区中当前存储的分区数最少。每个分区将完全位于数据目录之一中,如果 分区之间的数据不平衡,就会导致磁盘之间的负载不平衡 。

RAID在平衡磁盘之间的负载方面做得更好,它能在较低的水平上平衡负载。RAID的主要缺点是减少了可用的磁盘空间(RAID0除外),好处是可以容忍磁盘故障(RAID1,RAID5等)。

在生产中强烈不建议使用RAID 5 or RAID 6 ,会严重影响到写吞吐的性能,并且在磁盘故障时会有重建阵列的I/O成本( RAID0下也存在重建I/O的成本 )

如果额外的成本可以接受,建议使用RAID10(容量减半,多一份冗余),否则,建议Kafka服务器配置多个日志目录,每个目录都安装在单独的驱动器上。

linked使用8x7200转的sata磁盘,一般来说,磁盘吞吐量是性能瓶颈,磁盘越多越好。

kafka官方文档中其实建议使用多个驱动器以获得良好的吞吐量,因为每个路径都独立挂载在不同的磁盘上,这使得多块物理磁盘磁头同时执行物理I/O写操作,可以极大地加速Kafka消息生产的速度。

注意: 通常在使用本地盘时,容量可能会比较大,当磁盘容量超过2T时,Linux下默认的MBR分区就不能满足容量的要求了,此时需要在分区时进行GPT分区,否则等线上业务真正上线后会发现超过2T的空间就被浪费了。

另外一个问题就是,磁盘容量规划的问题,虽然kafka默认为全部的日志数据设置了7天保留时间,但是往往在海量的数据消费场景中,单天的数据量也可能达到好几个T,这就导致了需要提前对业务的场景和使用方式进行提前规划,并提前计算最少的存储量。

但一般对于磁盘空间的规划可以根据消息量大概估算,比如一天7亿条消息,单条1kb,消息副本为3(可保证2节点同时挂),那么大概的存储空间为 7亿*3*1KB/1000/1000=2100G ,也就是这种规模下的数据,一天产生2T的数据,实际使用数据为700G,1400G数据为冗余数据,此时我们在规划磁盘容量时就需要考虑到单天数据量的大小,以及数据的保留时间。

注意: 如果客户端开启了消息压缩,整体的数据能再相对小一些,可以根据具体情况来考虑

1.4 Network

在分布式系统中,快速可靠的网络是性能的一个重要组成部分( 因此通常分布式系统中建议在同机房 )。

低延迟确保节点可以轻松通信,而高带宽有助于集群节点之前的副本移动和恢复( 往往在kafka集群中优先瓶颈点都是带宽 )。

目前大多数的数据中心基本都是千兆(1 GbE)或万兆网络(10 GbE),对于大多数集群通常都是足够的。

应该尽量避免集群跨越多个数据中心,即使数据中心在很近的距离同地区,也要避免跨越巨大地理距离的集群。

备注: 实际上在分布式系统中分区是肯定会发生的,通过避免跨机房部署能够降低分区的概率

Kafka集群假设所有节点都是相等的,较大的延迟可能会加剧分布式系统中的问题,并使调试和解决变得更加困难。

注意: 如果业务上需要异地进行数据读写,推荐的方法是在每个数据中心中部署一个本地Kafka集群,每个数据中心中的应用程序实例只与它们的本地集群交互,并在集群之间进行镜像(kafka提供了mirror-maker工具)。

1.5 Filesystem

现在操作系统中,大部分的系统应该都使用了 Ext4 或 XFS 系统,官方也推荐使用这两种文件系统,但是对于具体的文件系统的选择,官方提供了如下几种场景和需要注意的点。

使用各种文件系统创建和挂载选项,在具有大量消息负载的集群上执行了比较测试,XFS带来了更好的本地时间(最好的EXT4配置是160ms vs. 250ms+),以及更低的平均等待时间。XFS性能在磁盘性能方面的可变性也较小。

不论是使用哪种文件系统,推荐修改默认的挂载参数:

  • noatime : 此选项禁止在读取文件时更新文件的atime(最后访问时间)属性,这可以消除大量的文件系统写操作,特别是在引导消费者的情况下,Kafka完全不依赖于atime属性,所以禁用它是安全的
<code>$ cat /etc/fstab
UUID=

"4231b126-7e67-45c4-b8bf-554006291d35"

/export1 xfs defaults,noatime 0 2/<code>

XFS文件系统挂载参数优化:

  • largeio : 这会影响stat调用报告的首选I/O大小,尽管这可以在较大的磁盘写入上实现更高的性能,但实际上对性能的影响很小或没有影响
  • nobarrier : 于具有电池后备缓存的基础设备,此选项可以通过禁用定期写刷新来提供更高的性能。 但是,如果基础设备的行为良>好,它将向文件系统报告它不需要刷新,并且此选项将无效。

EXT文件系统挂载参数优化:

注意: 在ext4文件系统下,要获得最佳性能,则需要调整几个参数。这些选项在故障情况下通常是不安全的,并且将导致更多的数据丢失和损坏,对于单个broker故障,可以擦除磁盘并从群集重建副本,在多数情况下,多broker异常意味着潜在的文件系统损坏,无法轻易恢复。

  • data=writeback : Ext4默认为data = ordered,这使某些写入操作具有很强的顺序,在Kafka场景下其实不需要该参数,此设置消除了排序约束,并且似乎大大减少了延迟
  • Disabling journaling : 日志记录是一个折衷:它使服务器崩溃后重新引导更快,但它引入了大量额外的锁定,增加了写入性能的差异
  • commit=num_secs : 这调整了ext4提交到其元数据日志的频率。 将此值设置为较低的值可以减少崩溃期间未刷新数据的丢失。 将此值设置为较高的值将提高吞吐量。
  • nobh : 此设置控制在使用data=writeback模式时附加的排序保证,可以提高吞吐量和延迟
  • delalloc : 延迟分配意味着文件系统避免在物理写入发生之前分配任何块,此功能非常适合吞吐量

1.6 Application vs. OS Flush Management

Kafka始终会立即将所有数据写入文件系统,并支持配置刷新策略的功能,该策略控制何时使用刷新将数据从OS缓存中强制出到磁盘上。

可以控制此刷新策略,以在一段时间后或在写入一定数量的消息之后将数据强制到磁盘。 在此配置中有几种选择。

Kafka必须最终调用fsync才能知道数据已刷新。

当从崩溃中恢复任何未知的日志段时,Kafka将通过检查其消息的CRC来检查每条消息的完整性,并在启动时执行的恢复过程中重建附带的偏移索引文件 。

请注意,Kafka中的持久性不需要将数据同步到磁盘,因为发生故障的节点将始终从其副本中恢复。

我们建议使用默认刷新设置,该设置将完全禁用应用程序的fsync。

这意味着依靠操作系统和Kafka自己的后台刷新来完成后台刷新。

这为所有用途提供了最佳的解决方案:无需调节配置,提高吞吐量和延迟,并提供完全恢复保证。

通常,我们认为 复制 提供的保证要强于 同步到本地磁盘 ,但是偏执狂仍然更愿意同时拥有两者,并且仍然支持应用程序级fsync策略。

使用应用程序级刷新设置的缺点是,其磁盘使用模式效率较低(它给操作系统减少了重新排序写操作的余地),并且由于大多数Linux文件系统中的fsync阻止了文件写入,因此它会引入延迟。 后台刷新进行更精细的页面级锁定。

1.7 理解Linux操作系统的Flush行为

在Linux中,写入文件系统的数据将保留在页面缓存中,直到必须将其写出到磁盘为止(由于应用程序级fsync或操作系统自身的刷新策略)。

数据刷新是通过一组称为pdflush的后台线程完成的(或在2.6.32版的内核“冲洗线程”中)。

Pdflush具有可配置的策略,该策略控制可以在缓存中维护多少脏数据以及必须将多脏数据写回到磁盘的时间.

pdflush刷新策略

当Pdflush无法跟上写入数据的速度时,它将最终导致写入过程阻塞写入中的延迟,从而减慢数据的累积。

与进程内缓存相比,使用pagecache存储将写入磁盘的数据有几个优点:

  • I/O调度将连续的小写批量写到更大的物理写中,从而提高吞吐量
  • I/O调度尝试重新排序写操作,以最小化磁盘头的移动,从而提高吞吐量
  • 它会自动使用机器上所有的空闲内存

2.节点配置

  • 1.避免使用太小的节点配置,因为这样整个集群的节点数可能会特别多,在这种机器上运行kafka将会有更多的开销
  • 2.避免使用太高配计算机,因为它们经常导致资源使用不平衡,比如内存优先不够了,但cpu还剩余很多。如果在每个高配机器上运行多个broker节点,将会增加整体的复杂度

3.JVM配置

在当前大多数Java类应用下,基本都在使用JDK8(建议使用最新的jdk8),在此环境下默认使用的是 G1 的垃圾回收器,因此一般情况下仅需要修改如下参数即可:

  • MaxGCPauseMillis : 指定每次垃圾回收默认的停顿时间,默认值200ms
  • InitiatingHeapOccupancyPercent : G1 启动新一轮垃圾回收之前可以使用的堆内存百分比,默认值是45

官方推荐的GC参数如下:

<code>

-Xms6g -Xmx6g -XX:MetaspaceSize=96m -XX:+UseG1GC -XX:MaxGCPauseMillis=20

-XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80/<code>

作为参考,LinkedIn最繁忙的集群当前是如下情况:

  • 60 brokers
  • 50k partitions (replication factor 2)
  • 800k messages/sec in
  • 300 MBps inbound, 1 GBps + outbound

上面的GC调优看起来比较激进,但集群中的所有broker都会有90%的gc中止时间,大概21ms,它们做不到每秒一个young GC

4.kafka核心配置

Kafka默认设置在大多数情况下都能工作,特别是与性能相关的设置和选项,但是考虑到集群的规划以及场景用途,有一些补充的配置参数可以对生产环境进行调优。

通常配置上来讲会分为 broker端配置 、 produser端配置 、 consumer端配置 ,由于各个业务方当前均使用开源客户端,因此对于客户端的配置无法做到严格管控(如果有内部的sdk封装可能会比较好)。

注意: 如下相关参数仅为官网中重要程度较高的参数,参数较多,建议收藏保存,仔细研读。

4.1 重要的客户端配置

  • acks : 消息一致性保证(0:投递即成功,1:副本同步即成功,all/-1:全部ISR同步即成功)
  • compression : 压缩类型
  • batch size : 批处理大小

注意: 对于消费者来说,最重要的参数为 fetch size

鉴于集群的整体可用性可靠性其实很大一部分和客户端的使用方式有关,后面会列举一些常见的生产者和消费者端的核心参数

kafka详细参数列表

4.2 broker核心配置

zookeeper.connect

zk连接串,建议写全部的zk节点地址。

brokers链接的zk集群地址,该值默认采用 host:ip/path 来指定一个zk中的znode节点,通常情况下用来隔离环境. kafka的zk路径中使 用了 chroot 环境,如果不指定使用默认的 / 来作为存储路径。

broker.id

broker唯一标识,该值可以任意设定(int类型)。默认 reserved.broker.max 开始,每次+1

在分布式集群中,可以手动指定每个broker的节点信息,同时也可以使用如下方式来自动生成每个broker的id

<code> 

broker.id.generation.enable

=

true

/<code>

log.dirs

kafka存储的日志消息都是保存在该参数指定的日志路径下,该值可以指定多个磁盘路径,通常我们会绑定到多个磁盘上。比如 log.dirs=/exoprt/kafka1,/export/kafka2,/export/kafka3

对应的另外一个默认参数为 log.dir ,默认值为 /tmp/kafka-logs

listeners

broker监听列表,默认将为 PLAINTEXT://myhost:9092

advertised.listeners

监听器发布到zk集群中的地址,供客户端使用,默认采用 listeners 参数值

num.recovery.threads.per.data.dir

每个数据目录用于在启动时进行日志恢复和在关闭时进行刷新的线程数,默认值为: 1

对于如下几种情况,kafka会使用 可配置的线程池 来处理日志片段.

  • 服务器正常启动: 用于打开每个分区的日志片段
  • 服务器崩溃后重启: 用于检查和截断每个分区的日志片段
  • 服务器正常关闭: 用于关闭日志片段

默认情况下,每个日志目录采用一个线程,因为这些线程仅有在启动和关闭时才用到,所以可以适当设置大一点,并不会影响到整体服务的性能,特别是对于包含大量分区的服务器来说,一旦发生崩愤,在进行恢复时使用井行操作可能会省下数小时的时间。

需要注意的是,该值是每个日志目录的线程数,因此总线程数需要考虑到 log.dirs 的配置

备注 : 这也是在使用RAID和JBOD两种磁盘方案的另外一个考虑点

delete.topic.enable

是否允许删除topic,默认为: true

如果为false,通过管理工具删除topic仅为标记删除,此时使用 describe 命令可以查看到topic的详情信息,但是无法写入,可以通过删除 zk 中的节点来删除

备注 : 生产环境建议设置为 false ,由集群管理员定期统一的进行删除和管理

auto.create.topics.enable

默认情况下,kafka会使用如下三种方式创建topic:

  • 当一个生产者开始往主题写入消息时
  • 当一个消费者开始从主题读取消息时
  • 当任意一个客户端向主题发送元数据请求时

推荐是设置成 false ,不允许客户端直接创建topic,否则topic会无法管理。默认值为 true

auto.leader.rebalance.enable

是否开启 leader 自动平衡,默认值为 true 。后台会有线程进行定期检查leader的分布情况

kafka中有一个被称为优先副本(preferred replicas)的概念(通常分区会有主分区和副本分区的概念,主分区先写入,然后push到其他副本分区)。

如果一个分区有3个副本,且这3个副本的优先级别分别为0,1,2,根据优先副本的概念,0会作为leader 。

当0节点的broker挂掉时,会启动1这个节点broker当做leader。

当0节点的broker再次启动后,会自动恢复为此partition的leader。不会导致负载不均衡和资源浪费,这就是leader的均衡机制(前提是第一次partition在分配的时候,它本身就是一个相对平均的分配)

<code>

auto.leader.rebalance.enable

=

true

/<code>

num.partitions

自动创建topic的默认分区数,默认为1,通常生产集群不建议打开topic自动创建,一方面是不便于管理和追溯,另外一方面因为自动创建默认分区时1,且无法动态变更,造成的风险可能会比较大。

多分区的topic有这更好的数据平衡能力,并且可以帮助消费者进行并行化消费。

注意: 对于有key的数据,避免改变分区的数量

default.replication.factor

适用于自动创建的主题的默认复制因子,推荐至少设置为2,默认为1

min.insync.replicas

当使用 required.acks=-1(all) 提交到生产请求所需的ISR中的最小副本数,默认为1,建议在数据一致性要求较高的topic中设置至少为2

指定 ISR 的最小数量。当producer设置 ack=all(-1) 时,该值指定的副本必须全部写成功,才认为消息写入成功,否则生产者将抛异常( either NotEnoughReplicas or
NotEnoughReplicasAfterAppend )

注意 : min.insync.replicas 参数和生产者 ack 参数一起使用,可以加强整个消息的持久性

示例:(3副本的topic,可以设置该值为2,同时生产者ack设置为all,这将确保大多数副本没有收到写操作时,生产者直接异常)

默认值为: 1

unclean.leader.election.enable

是否启用不在ISR集中的副本以选作领导者,即使这样做可能会导致数据丢失。该参数可以提高整体Topic的可用性,但是可能会造成数据的整体不一致性(部分数据的丢失)。

kafka可用性和可靠性保证

默认值为: false

为false,就只从ISR中获取leader保证了数据的可靠性,但是partition就失效了, true 则从replica中获取,则可用性增强,但是数>据可能存在丢失情况

注意: 该参数实际上在设置的时候也有一定的争议性,比如,我们知道副本是有 ISR 的,即正在同步的副本,如果当前的broker宕>机导致需要选举leader partition,此时如果ISR内除了leader之外还有其他副本(但谁又能保证一定有呢),那直接从ISR中选举leader>即可,如果没有的话,当
auto.leader.rebalance.enable=true 时,就会去其他存活的副本中选举leader,此时可以增强整体的可用性,但是如果存活的副本不在ISR中,即意味着数据可能有一定的丢失了。但是如果该参数为false的话,ISR中没有,就直接异常了,为了保证数据的一致性。

该参数的存在其实是在可用性和可靠性之间做了一个权衡,为true时保证了可用性AP,为false时保证了一致性CP

数据一致性保证 : ISR就保存了kafka认为可靠的副本,它们具备这样的条件:

  • 落后leader的消息条数在一定阈值内
  • 或者落后在一定时间内;

num.replica.fetchers

该参数指定了fetch线程的数量(从源broker中复制消息的fetch线程),默认值: 1

其实可以适当的调整大一些,可以增强副本之前的同步效率

num.io.threads

broker处理请求的 IO 线程数,需要考虑到磁盘的IO状况。默认值为: 8

num.network.threads

指定broker用来接收来自网络的请求和发送网络的响应的线程数,默认值为: 3

background.threads

后台任务处理线程数(例如过期消息删除等)。默认值为: 10

socket相关

  • socket.send.buffer.bytes: (socket发送缓冲区:SO_SNDBUFF) 默认值: 102400
  • socket.receive.buffer.bytes: (socket接收缓冲区:SO_RCVBUFF) 默认值: 102400
  • socket.request.max.bytes: (请求最大值,message.max.bytes要小于该值较好) 默认值: 104857600

message.max.bytes

该值表示kafka允许的最大的batch大小(不是单个message的大小),默认值为 1000012 ,即1Mb.

在最新的消息格式版本中,为了提高效率,一般消息的提交都是采用batch的方式。

注意: 在以前的消息格式版本中,未压缩的记录不会分组成批,在这种情况下,此限制仅适用于单个记录。

在每个topic级别可以使用 max.message.bytes 设置

log相关(具体到topic级别)

  • log.segment.bytes: 单个日志段(segment)的大小,默认为 1073741824 ,即1GB
  • log.segment.delete.delay.ms: 日志段从文件系统中删除前等待的时间,默认为 60000 ,即1min
  • log.cleanup.policy: 保留窗口之外的日志清理策略可同时指定多个策略如: [ delete ,compact]
  • log.cleaner.enable: 启用日志清除器进程,和 cleanup.policy = compact 参数结合使用,默认为 true
  • log.cleaner.threads: 日志清理的后台线程数量,默认为 1
  • log.cleaner.delete.retention.ms: 删除的日志保留的时间,默认为 86400000
  • log.retention.bytes: 删除日志前,日志最大的大小,超过该值即删除,默认 -1 ,作用在每个partition,会影响整个topic的容量
  • log.retention.minutes(hours|ms): 日志保留时间,如果没指定,默认使用hours参数
  • log.retention.check.interval.ms: 日志清理器检查日志是否符合删除条件的频率,默认为 300000
  • log.flush.interval.messages: 将消息刷新到磁盘之前在日志分区上累积的消息数,默认为 9223372036854775807
  • log.flush.interval.ms: 主题中的消息在刷新到磁盘之前保存在内存中的最大时间,默认为 null (log.flush.scheduler.interval.ms参数的值)
  • log.flush.scheduler.interval.ms: 日志刷新器检查是否需要将日志刷新到磁盘的频率,默认 9223372036854775807
  • log.flush.offset.checkpoint.interval.ms: 更新最后一次刷新的持久记录(被作为恢复点)的频率,默认为 60000
  • log.flush.start.offset.checkpoint.interval.ms: 更新日志起始偏移量的持久记录的频率,默认 60000
  • log.roll.hours: 新日志段(segment)被创建前的最大时间,默认 168 ,如果没设置优先使用 log.roll.ms

offsets相关

  • offsets.commit.required.acks: offset提交之前是否需要ack确认,默认值: -1
  • offsets.commit.timeout.ms: 当偏移量注意 _offset 的所有副本接收到提交或超时达到该时间时,offset提交将延迟. 默认值: 5000
  • offsets.load.buffer.size: 偏移量加载到缓存中时从偏移量段读取的批处理大小. 默认值: 5242880
  • offsets.retention.check.interval.ms: 历史offset检查的频率,默认值: 600000
  • offsets.retention.minutes: 在消费者组的消费者全部异常之后,offset保留的时间,默认值: 10080
  • offsets.topic.compression.codec: 偏移量主题的压缩解码器,默认: 0
  • offsets.topic.num.partitions: offset提交主题的分区数量,默认: 50 ( 注意: 集群部署后不要改变)
  • offsets.topic.replication.factor : offset提交主题的副本数,默认: 3 (在集群大小满足此复制因子要求之前,内部主题创建将>失败,该主题非常重要,需要要求强一致性)
  • offsets.topic.segment.bytes: offset提交主题的段大小,设置相对较小,以便更快地实现日志压缩和缓存负载,默认值: 104857600 ,即1Mb

queue相关

  • queued.max.requests: 在网络阻塞线程前,数据平面允许的排队请求数,默认值: 500

replica相关

  • replica.fetch.min.bytes:每个fetch响应所需的最小字节数,默认值: 1
  • replica.fetch.wait.max.ms: 由follow副本发起的每个fetch请求的最大等待时间,该值应小于 replica.lag.time.max.ms ,以避免 为低吞吐量的主题频繁地收缩ISR,默认值: 500
  • replica.lag.time.max.ms: follow副本在该时间内没有和leader副本同步,或没有发送任何同步请求,将会被leader副本从ISR中删>除. 默认值: 10000 ,即10s
  • replica.socket.receive.buffer.bytes: 副本接收请求的网络缓冲区,默认值: 65535
  • replica.socket.timeout.ms: 网络请求的超时时间,默认值: 30000
  • replica.fetch.backoff.ms: 发生获取分区错误时要休眠的时间,参数不是很重要,默认值: 1000
  • replica.fetch.max.bytes: 尝试为每个分区获取的消息字节数,参数不是很重要,默认值: 1048576 ,即1M

broker.rack

broker所在的机架,用来感知机架的变化,通常多个分区不会放在同一个机架上

示例: RACK1 , us-east-1d

controller控制器相关

  • controlled.shutdown.enable : 启用控制器关闭,默认: true
  • controlled.shutdown.max.retries: 控制器会因为各种原因而宕机,该值表示控制器的重试次数,默认: 3
  • controlled.shutdown.retry.backoff.ms: 在每次重试之前,系统从前一次故障(控制器fail over或副本延迟)的状态中恢复过来的时 间,默认: 5000
  • controller.socket.timeout.ms : 控制器到broker角色转换的socket超时时间,默认: 30000

group相关(消费组)

  • group.max.size: 消费组中最大消费者数量
  • group.initial.rebalance.delay.ms:注册消费者允许的最小会话超时,默认: 6000

注意: 很多参数是有不同级别的生效范围的,比如:

  • read-only : 仅在broker重启后才能生效
  • per-broker : 可以为每个broker动态更新
  • cluster-wide : 可作为集群范围内的值动态更新,也可以在每个broker上更新进行测试

broker配置作用范围

示例配置

<code> 

zookeeper.connect

=[list of ZooKeeper servers]/<code>

kafka log相关配置

num.partitions=8 default.replication.factor=3 log.dirs=[List of directories. Kafka should have its own dedicated disk(s) or SSD(s).]

其他配置核心配置

broker.id=[An integer. Start with 0 and increment by 1 for each new broker.] listeners=[list of listeners] auto.create.topics.enable=false min.insync.replicas=2 queued.max.requests=[number of concurrent requests]


注意: 在kafka集群中,当broker集群参数确定后,还有一些针对topic的参数是可以进行动态调整的,以提高kafka服务的灵活性。

4.3 Topic级别的动态参数调整

Topic级别的配置

注意 : 如下topic的参数可以在使用过程中进行动态调整,使用 kafka-topic.sh 工具中的 alter 参数来直接修改topic相关的参数。

cleanup.policy

一个字符串是“删除”或“压缩”或两者兼而有之. 默认值: [compact, delete]

compression.type

日志压缩类型,默认值为 producer

delete.retention.ms

用于日志压缩主题的删除保留时间。默认值: 86400000

max.message.bytes

指定每个topic可发送的最大消息(batch size)字节数.(区别于全局的 message.max.bytes 参数)

num.partitions

指定创建topic的默认分区数量,该值默认为1,建议根据具体的情况进行设定,越多的分区对于海量数据来说可以提高吞吐,但是对于>少量数据来说,也可能增加网络消耗

注意: 分区数一旦指定,只能增加,不能减少

default.replication.factor

指定kafka副本数,默认每个主题分区均有一个副本,当该副本所在主机异常,可能造成数据的丢失,建议在适当场景将副本至少设置成 2个,以尽快保证数据的一致性。默认值: 1

注意: 自动创建主题的副本因子

log.retention.ms

kafka的log保留时间,也可以使用 log.retention.hours 参数来配置保留时间,默认168小时,即一周

log.retention.bytes

指定log保留的大小,作用在每一个partition上,加入一个topic有3个partition,设置了 log.retention.bytes 为1GB,则表示整个topic仅可以存储3GB的数据,超过该容量的数据会被进行自动删除。

此时,临时增加该topic的容量的方法就是调整该参数,或调整topic的partition个数。

log.segment.bytes

指定每个日志段的大小,通常在消息到达broker时,会被追加到分区的当前日志段上(segment),当日志段大小超过该参数指定的值(默认1GB),当前日志段就会被关闭,一个新的日志段被打开。

如果一个日志片段被关闭,就开始等待过期,该值不建议设置太小。

log.segment.ms

上面会指定日志段的分割,该参数会指定历史的日志段的过期时间。该参数会和 log.retention.bytes 一起校验,谁先满足就生效。

message.max.bytes

该值用来限制单个消息的大小,默认值为 1000 000 即 1MB ,如果超过该大小,broker不会接受,而且会抛出相关异常

注意: 该参数指的是消息被压缩后的大小,通常生产中的消息生产会使用gzip或snappy来进行压缩

注意: 消息的大小对于性能有比较显著的影响,越大负责处理网络连接和请求的线程就需要花越多的时间来处理这些请求,还会增加磁 盘写入块的大小,从而影响 IO 吞吐量。

retention.ms

规定了该主题消息被保存的时常,默认是7天,即该主题只能保存7天的消息,该值可设定在topic级别,此时会覆盖broker端的全局参数

retention.bytes

该值规定了要为该 Topic 预留多大的磁盘空间,默认为-1,该值通常情况用来设置topic的配额

4.3 Producer核心参数

bootstrap.servers

指定broker地址

key.serializer

broker 需要接收到序列化之后的 k/v 值,所以生产者需要将序列化后的值发送过来。


org.apache.kafka.common.serialization.Serializer 该类表示把键对象序列化为字节数组

  • ByteArraySerializer: 默认的序列化方式
  • StringSerializer:
  • IntegerSerializer:

value.serializer

指定序列化后的value,需要实现
org.apache.kafka.common.serialization.Serializer 接口

org.apache.kafka.common.serialization.StringSerializer

compression.type指定消息压缩类型:gzip,snappy等,

broker端也有该参数,默认值为: producer ,表示遵循生产者的压缩方式

注意: 生产者使用何种压缩方式,消费者将必须使用该方式进行解压缩

acks

该参数用来声明要有多少个分区副本接收消息后,生产者才认为消息写入成功,也就是数据一致性衡量,该参数对消息的丢失的影响较>大. 默认值为: 1

  • acks=0 : 表示生产者不知道消息是否被broker成功接收被处理,反正自己发出去了就认为是成功了,该种清理增加了吞吐,但是也>增加的数据丢失的风险,因为程序的稳定性,网络的稳定性都可能会影响到消息的生产
  • acks=1 : 只要集群中leader接收到消息并成功处理,就返回给生产者写入成功的消息。该种情况,如果发送过程中网络出现问题或>者kafka集群异常导致leader没工作而导致消息写入失败,生产者会受到写入失败相关的异常,此时生产者可进行重试
  • acks=all/-1 : 表示所有参与复制的节点都收到消息时,生产者才会接收到来自服务器端写入成功的消息,该种情况下,整体的消息 确认延迟会更高一些,但是数据的一致性也更强一些

注意: 消息的发送其实也分 sync 和 async ,即同步和异步,kafka为了保证消息高效传输会决定是同步发送还是异步发送。如果让>客户端等待服务器的响应(通过调用get()方法)也会增加延迟,如果采用客户端回调方式,延迟问题可能会有好转。

buffer.memory

该参数用来设置生产者内存缓冲区的大小,生产者会用它来缓冲要发送到服务器的消息,以此来提供消息传递的效率。默认值: 33554432

注意: 如果应用程序发送消息的速度超过发送到服务器的速度,会导致生产者空间不足,此时 send() 方法就会阻塞或者直接异常,取 决于 block.on.buffer.null 参数

retries

生产者从服务器收到的错误有可能是临时性的错误,比如暂时找不到 leader 或者当前partition正在迁移无法找到相关的partition,>这种情况下,该参数可以决定生产者的行为,如果重试次数超过之后,生产者就会放弃重试,并返回错误。

默认情况下,生产者在每次重试之间等待100ms,这个等待参数可以通过 retry.backoff.ms 来修改

batch.size

指定每次提交的batch大小,默认值: 16384

当有多个消息需要被发送 同一个分区 (如何决定是发送到同一个分区?)时,生产者会把他们发送到同一个批次里.

该参数用来指定一个批次提交的大小,当达到该batch的大小,所有的消息会被统一发送至broker

client.id

该参数用来指定客户端的id,不过可以不用指定,注册后为每个客户端生成64为的id

max.in.flight.requests.per.connection

此参数指定了生产者在收到服务器响应之前可以发送多少消息,它的值越高,就会占用越多的内存,不过也会提高吞吐量

把它设为1 可以保证消息是 按照发送的顺序 写入服务器。

timeout相关参数

  • request.timeout.ms : 生产者在发送数据时等待服务器返回的响应时间,默认值: 30000
  • metadata.fetch.timeout.ms : 指定了生产者在获取元数据(比如目标分区的首领是谁)时等待服务器返回响应的时间
  • timeout.ms : 指定了 broker 等待同步副本返回消息确认的时间,与 asks 的配置相匹配

max.block.ms

此参数指定了在调用 send() 方法或使用 partitionFor() 方法获取元数据时生产者的阻塞时间.

当生产者的发送缓冲区已捕,或者没有可用的元数据时,这些方法就会阻塞,阻塞时间超过该参数值时,生产者抛出异常

max.request.size

该参数用于控制生产者发送的 请求大小 .

它可以指能发送的单个消息的最大值,也可以指单个请求里所有消息的总大小

receive.buffer.bytes和send.buffer.bytes

为了保证可靠的消息传输,这两个参数分别指定了 TCP Socket 接收和发送数据包的缓冲区 的大小,默认为-1,表示使用操作系统的>默认值。

注意: 如果生产者或消费者与broker所处的数据中心不同,该值可以适当调大

4.4 Consumer核心参数

在消费者组中的消费者重平衡期间,消费者无法读取消息,造成整个消费者组在重平衡的期间都不可用

消费者通过向组织协调者(Kafka Broker)发送心跳来维护自己是消费者组的一员并确认其拥有的分区。

对于不同步的消费群体来说,其组织协调者可以是不同的。

只要消费者定期发送心跳,就会认为消费者是存活的并处理其分区中的消息。当消费者检索记录或者提交它所消费的记录时就会发送心>跳。

如果一段时间,消费者不发送心跳了,会话(Session)就会过期,组织协调者就会认为这个 Consumer 已经死亡,就会触发一次重平衡 。

如果消费者宕机并且停止发送消息,组织协调者会等待几秒钟,确认它死亡了才会触发重平衡.

注意: 在这段时间里,组里的消费者将不处理消息(STW)

_consumer_offset 主题就主要是用来记录相关消费者的偏移量以及消费者分区分配的

fetch.min.bytes

指定了消费者从服务器获取记录的最小字节数,默认: 1

broker 在收到消费者的数据请求时,如果可用的数据量小于 fetch.min.bytes 指定的大小,那么它会等到有足够的可用数据时才把它>返回给消费者。

这样可以降低消费者和 broker 的工作负载,因为它们在主题使用频率不是很高的时候就不用来回处理消息。

如果没有很多可用数据,但消费者的 CPU 使用率很高,那么就需要把该属性的值设得比默认值大。

如果消费者的数量比较多,把该属性的值调大可以降低 broker 的工作负载。

fetch.max.wait.ms

上面参数用来控制每次fetch时的最小数据量,但也不能一直等待数据的容量满足要求,因此还有另外一个参数,即 fetch.max.wait.ms ,指定多长时间还没满足数据容量就进行fetch数据,默认是 500ms

max.partition.fetch.bytes

指定了服务器从每个分区里返回给消费者的 最大字节数 ,默认值为 1MB .

即 KafkaConsumer.poll() 方法从每个分区返回的记录最多不超过该值指定的大小。

加入一个20分区的主题,拥有5个消费者,那么每个消费者必须至少 4MB 的内存来接收消息(每个消费者消费4个分区,每个分区返回消>费者的最大字节数1MB)。

注意: 该参数的设置要适当的设置大一些,防止单个消费者异常后,整体内存受限。

至少,该参数的值要大于 max.message.size (broker接收消息的最大字节数),否则消费者无法读取这些消息,导致消费者一直重试并>挂起。

session.timeout.ms

指定了消费者在被认为死亡之前可以与服务器断开连接的时间,默认是 3s,在这个时间内没有发送心跳就会直接认为消费者死亡,此时 协调器就会进行触发consumer rebalance.

此参数与 heartbeat.interval.ms (poll() 方法向群组协调器发送心跳的频率)强相关。

auto.offset.reset

指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下的该如何处理,默认值为 latest ,意思是在偏移量无效的情况下>,默认从最新的记录下开始读取数据。可选值为 earliest ,表示从最开始位置进行读取.

enable.auto.commit

指定了消费者是否自动提交偏移量,默认值是 true,对应 auto.commit.interval.ms 参数来保证每次提交偏移量的频率

为了避免数据重复和丢失,消费者可以设置为false,由自己决定自己的消费位置(客户端保证数据消费的一致性)

partition.assignment.strategy

PartitionAssignor (分区分配器)会根据给定的消费者和主题,决定哪些分区应该被分配到哪个消费者,默认有两个策略: Range 和 RoundRobin

max.poll.records

用于控制 单次调用call() 方法能够返回的记录数量,可以帮你控制在轮询中需要处理的数据量.

heartbeat.interval.ms

在消费组中,消费者心跳到消费者协调器的频率,默认值: 3000ms


分享到:


相關文章: