Kafka学习记录之二

Kafak消费者

消费者与消费者组

Kafka消费者从属于消费者群组。一个群组里的消费者订阅的是同一个主题,每个消费者接收主题一部分分区的消息,达到横向伸缩消费能力。

消费者群组与分区再均衡

管理员添加新的分区,会发生分区重分配,分区的所有权从一个消费者转移到另一个消费者,这样的行为称为再均衡。

消费者通过向被指派为群组协调器的broker发送心跳来维持它们和群组的从属关系以及它们对分区的所有权关系。若群组协调器认为消费者已死亡,就会触发 一次再均衡。

创建kafka消费者

创建一个消费者对象,有3个属性必选:

  • bootstrap.servers

指定broker的地址清单,地址格式:host:port。建议至少要提供两个boker的信息。

  • key.deserializer

broker希望接收到的消息的键和值转为对象。

  • value.deserializer

指定的序列化的值转为对象。

  • group.id (非必选)

属于哪一个消费者群组

订阅主题

创建好消费者后,就可以开始订阅主题了。支持使用正则表达式匹配多个主题。

轮询

消息轮询是消费者API的核心,一旦消费者订阅了主题,轮询就会处理所有的细节,包括群组协调,分区再均衡,发送心跳和获取数据。

消费者的配置

1.fetch.min.bytes

指定消费者从服务器获取记录的最小字节数。

borker收到消费者请求,可用数据小于此值 ,会等到足够可用数据时再返回给消费者,降低消费者与broker的工作负载

2.fetch.max.wait.ms

通过fetch.min.bytes告诉kafka,等到有足够的数据时才把它返回给消费者。此参数指定broker的等待时间,默认500ms

3.max.partition.fetch.bytes

该参数指定了服务器从每个分区里返回给消费者的最大字节数。默认1MB

此值必须比broker能够接收的最大消息的字节数据(max.message.size)属性大,否则消费者可能无法读取这些消息,导致消费者一直挂起重试。

单次获取数据太多,消费者需要更多的时间来处理,可能无法及时进行下一轮询来避免会话过期,需要把此值 改小,或延长会话周期

4.session.timeout.ms

指定消费者在被认为死亡之前可以与服务器断开连接的时间,默认3S

若消费者没有在此值时间内发送心跳给群组协调器,就会认为忆死亡,协调器会触发再均衡。

该属性与heartbeat.interval.ms紧密相关,一般heartbeat.interval.ms是session.timeout.ms的1/3

5.auto.offset.reset

指定消费者在读取一个没有偏移量的分区或偏移量无效的情况下该作如何处理(因消费者长时间失效,包含偏移量的记录已经过时并被删除),默认latest(从最新读取);可配置earliest(从起始读取)。

6.enable.auto.commit

指定消费者是否自动提交偏移量,默认true,可通过配置auto.commit.interval.ms控制提交频率

为了尽量避免出现重复数据和数据丢失,可设置为false

7.partition.assignment.sttrategy

根据给定的消费者和主题,决定哪些分区应该被分配给哪些消费者

kafka有两个默认的分配策略:

Range:会把主题的若干个连续的分区分配 给消费者。第一个消费者会比第二个消费者分配到更多的分区

RoundBobin: 把主题逐个分配给消费者。

  1. client.id

可以是任意字符串,broker用它来标识从客户端发送来的消息

9.max.poll.records

控制单次调用获取数据能够返回的记录数量。

10.receive.buffer.bytes 和 send.buffer.bytes

socker在读写数据时用到的TCP缓冲区也可以设置大小

设置为-1,使用操作系统默认值

提交和偏移量

消费者可以使用kafka来追踪消息在分区里的位置 (偏移量),更新分区当前位置的操作叫提交。

触发再均衡后,每个消费者可能分配到新的分区,而不是之前处理的那个,为了继续之前的工作,消费者需要读取每个分区最后一次提交的偏移量,然后从偏移量指定的地坟继续处理。

自动提交

自动提交没有为开发者留有余地来避免重复处理消息。建议程序手协提交当前偏移量。

异步提交

同步提交偏移量会导致阻塞,使用异步可降低提交频率来提升吞吐量,但再均衡后会增加重复肖息的数量。

重试异步提交前,可查看当前序列号,若相等可以安全重试,若比较大,可停止重试。

消费者关闭连接前,采用同步与异步组合提交。

注意点

  • 再均衡监听器
  • 从特定偏移量处开始处理记录
  • 消费者如何退出群

深入kafka

控制器

控制器其实就是一个负责分区首领选举的broker。kafka使用zookeeper的临时节点来选举控制器,并在节点加入集群或退出集群时通知控制器。控制器负责在节点加入或离开集群时进行分区首领选举。控制器使用epoch来避免“脑裂”(两个节点同时认为自己是当前的控制器)。

复制

复制功能是kafka架构的核心,可以在个别节点失效时仍能保证kafka的可用性和持久性。

副本有两类:

  • 首领副本

每个分区都有一个首领副本,为了保证一致性,所有生产者请求和消费者请求都会经过这个副本。

  • 跟随者副本

首领以外的副本都是跟随者副本。跟随者副本不处理来自客户端的请求,它们唯一的任务就是从首领那里复制消息,保持与首领一致的状态。如果首领发生崩溃,其中的一个跟随者会被提升为新的首领。

如果跟随者在10s内没有请求任何消息,或者虽然在请求消息,但在10s内没有请求最新的数据,则认为是不同步的,相反,持续请求得到的最新消息副本被称为同步的副本。在首领发生失效时,只有同步副本才有可能被选为新首领。

除了当前首领之外,每个分区都有一个首先首领— 创建主题时选定的首领就是分区的首选首领。

注:如果手动进行副本分配,第一个指定的副本就是首选首领,所以要确保首选首领被传播到其它broker上,避免让包含了首领的broker负载过重,而其它broker去无法为它们分担负载。

处理请求

broker的大部分工作是处理客户端、分区副本和控制器发送给分区首领的请求。

所有请求消息都包含一个标准消息头:

  • Request type (API key)
  • Request versin(broker可以处理不同版本的客户端请求,并根据客户端版本作出不同的响应)
  • Correlation Id – 一个具有唯一性的数字,用于标识请求消息,同时也会出现在响应消息和错误日志里(用于诊断问题)
  • Client ID –用于标识发送请求的客户端

生产请求和获取请求都必须发送给分区的首领副本。如果borker收到一个针对特定分区的请求,而该分区的首领在另一个borker上,那么发送请求的客户端会收到一个“非分区首领”的错误响应。所以客户端使用了另一种请求类型–元数据请求。服务器端响应消息里指明了这些主量所包含的分区,每个分区都有哪些副本,以及哪个副本是首领。

客户端会把元数据信息缓存起来,并时不是刷新这些信息(通过metadata.max.age.ms配置)

生产者请求时,会进行一些验证:

  • 发送数据的用户是否有主题写入权限?
  • 请求里包含的acks值 是否有效(只允许出现0、1或all)?
  • 如果acks=all,是否有足够多的同步副本保证消息已经被安全写入?


分享到:


相關文章: