Kafka-消費者提交偏移量

偏移量的作用,一個消費者組消費一個主題的過程中,如果有消費者發生崩潰或者有新的消費者加入群組,就會觸發再均衡,完成再均衡後,每個消費者可能會分配到新的分區,為了能夠繼續之前的消息讀取,消費者需要一個記錄上一次讀取到的位置的偏移量,之後從這個偏移量指定的位置開始讀取。

在新版 Kafka 中,消費者提交偏移量是通過向 __consumer_offset 特殊主題發送包含每個分區的偏移量來實現的。

KafkaConsumer API 提供了多種方式來提交偏移量:

  • 自動提交
  • 提交當前偏移量
  • 異步提交
  • 同步和異步組合提交
  • 提交特定的偏移量

自動提交

如果 enable.auto.commit 被設為 true,根據 auto.commit.interval.ms 設置的提交時間間隔,消費者會自動把從 poll() 方法接收到的最大偏移量提交上去。

自動提交是在輪詢中進行的,消費者每次在進行輪詢時會檢查是否該提交偏移量了,如果是,就會提交從上一次輪詢返回的偏移量。

自動提交方式雖然使用簡單,但是因為是週期性的提交時間間隔,可能會發生在某個時間間隔內讀取的數據的偏移量未提交,發生了再均衡,這樣造成讀取的數據重複,且自動提交併沒有辦法去避免重複處理消息,即使將時間間隔減小,但也沒辦法完全避免。

提交當前偏移量

消費者 API 提供了另一種提交偏移量的方式,開發者可以在必要的時候提交當前偏移量。

需要將 enable.auto.commit 設為 false,讓應用程序使用 commitSync() 來決定何時提交偏移量,這個 API 會提交由 poll() 方法返回的最新偏移量,提交成功後馬上返回,如果提交失敗就拋出異常。

異步提交

commitSync() 提交有一個不足,在 broker 對提交請求作出響應之前,應用程序會一直阻塞,這樣會限制應用程序的吞吐量。這個時候可以使用異步提交 API,只需要發送提交請求,無需等待 broker 的響應。

在成功提交或碰到無法提交的錯誤之前,commitSync() 會一直嘗試,但 commitAsync() 不會,這是因為如果它在得不到服務器響應的時候,進行重試,可能會覆蓋掉已經提交成功的更大的偏移量,這個時候再發生再均衡,就會出現重複消息。

commitAsync() 方法也支持回調,在 broker 作出響應時會執行回調,回調經常被用於記錄提交錯誤或生成度量指標。

同步和異步組合提交

在關閉消費者或者再均衡前的最後一次提交,必須要確保提交成功,因此,再消費者關閉前一般會組合使用 commitAsync() 和 commitSync()。它的工作原理如下:

try{
while(true){
consumer.poll(Duration.ofSeconds(1));
// ....
consumer.commitAsync();
}
}catch(Exception e){
// ...
}finally{
try{
consumer.commitSync();
}finally{
consumer.close();
}
}

先使用 commitAsync() 方法來提交,這樣的速度更快,而且即使這次提交失敗,但下次可能會成功,直到關閉消費者,沒有所謂的下一次提交了,使用 commitSync() 會一直重試,知道提交成功或發生無法恢復的錯誤。

提交特定的偏移量

消費者 API 允許在調用 commitSync() 和 commitAsync() 方法時傳入提交的分區和偏移量的 map,方法的定義如下:

public void commitSync(final Map<topicpartition> offsets)
public void commitSync(final Map<topicpartition> offsets, final Duration timeout)
public void commitAsync(final Map<topicpartition> offsets, OffsetCommitCallback callback)
/<topicpartition>/<topicpartition>/<topicpartition>

本文由博客一文多發平臺 https://openwrite.cn?from=article_bottom 發佈!


分享到:


相關文章: