09.04 Kafka 源碼:KafkaConsumer 消費處理

Kafka消費者客戶端從Kafka cluster中讀取消息並處理。

Kafka消費者可以手動綁定自己到某個topic的某些partition上或者通過subscribe方法監聽某個topic自動綁定。Kafka消費者綁定到某個parition後就和這個partition的leader連接,然後發出fetch request, 獲取消息後進行處理。

offset管理

kafka的消費模型是一個partition最多被一個consumer消費,而offset可以有consumer控制,例如通過seek前進或後退到某個offset位置。

首次連接時,可以通過KafkaConsumer配置參數裡的auto.offset.reset參數決定是從最新的位置(默認)還是從就早的位置開始消費。

默認情況下, enable.auto.commit參數是true,即KafkaConsumer客戶端會定時commit offset,所有要注意的一點是如果poll函數得到ConsumerRecords後如果處理是異步的,則可能出現消費處理還沒有完成但是卻commit offset了,這時如果進程掛掉則重啟後則會發生丟消息的情況。這裡有兩種解決方案,1是poll後的處理是同步的,這樣下一次poll會嘗試commit offset,則能保證at least one語義。2是關閉enable.auto.commit, 然後通過KafkaConsumer.commitSync方法來手動commit offset。

max.poll.interval.ms參數用於設置kafka消費者處理一次poll的消費結果的最大時間(默認300s),如果超過了這個時間則consumer被認為掛了會重新rebalance。

Consumer線程相關

消費者多線程處理有幾種方式

  1. 每個consumer只由一個線程處理,優點是能保證partition內有序和實現簡單,缺點是併發能力受限於partition的數量
  2. 將consumption和process過程分離,即consumer拉到一個消息後傳遞給另一個線程或線程池處理,這裡提高了併發能力但是需要注意多線程處理中的順序問題不再保證以及可能出現consumer提交了offset而線程池沒處理完的情況,另外線程池要注意處理慢導致的內存隊列積壓問題。

KafkaConsumer.subscribe

監聽某個topic

subscribe(Collection topics, ConsumerRebalanceListener listener)

當消費者使用kafka cluster來管理group membership時,ConsumerRebalanceListener會在consumer rebalance時調用,consumer rebalance發生在消費者或消費關係變化的時候

  • 某個消費進程掛掉
  • 新消費進程加入
  • partition數量發生變化時

這個Listener的常見用途是保存這個partition的最新消費offset,在void onPartitionsRevoked(java.util.Collection<topicpartition> partitions)裡保存當前的partition和offset到數據庫中。然後reassign完成後,void onPartitionsAssigned(java.util.Collection partitions)中從數據庫讀取之前的消費位置,通過seek方法設置消費位置繼續消費。/<topicpartition>

Kafka.poll

public ConsumerRecords poll(long timeout) {

// KafkaConsumer不是線程安全的

acquireAndEnsureOpen();

try {

if (timeout < 0)

throw new IllegalArgumentException("Timeout must not be negative");

if (this.subscriptions.hasNoSubscriptionOrUserAssignment())

throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");

// poll for new data until the timeout expires

long start = time.milliseconds();

long remaining = timeout;

do {

Map<topicpartition>>> records = pollOnce(remaining);/<topicpartition>

if (!records.isEmpty()) {

// before returning the fetched records, we can send off the next round of fetches

// and avoid block waiting for their responses to enable pipelining while the user

// is handling the fetched records.

//

// NOTE: since the consumed position has already been updated, we must not allow

// wakeups or any other errors to be triggered prior to returning the fetched records.

if (fetcher.sendFetches() > 0 || client.hasPendingRequests())

client.pollNoWakeup();

if (this.interceptors == null)

return new ConsumerRecords<>(records);

else

return this.interceptors.onConsume(new ConsumerRecords<>(records));

}

long elapsed = time.milliseconds() - start;

remaining = timeout - elapsed;

} while (remaining > 0);

return ConsumerRecords.empty();

} finally {

release();

}

}

pollOnce處理

private Map<topicpartition>>> pollOnce(long timeout) {/<topicpartition>

client.maybeTriggerWakeup();

// 協調者進行一次poll,裡面會根據auto.commit.interval.ms決定是否自動提交offset

coordinator.poll(time.milliseconds(), timeout);

// fetch positions if we have partitions we're subscribed to that we

// don't know the offset for

if (!subscriptions.hasAllFetchPositions())

updateFetchPositions(this.subscriptions.missingFetchPositions());

// 如果已經有record數據了直接返回

// if data is available already, return it immediately

Map<topicpartition>>> records = fetcher.fetchedRecords();/<topicpartition>

if (!records.isEmpty())

return records;

// 發送一次fetch請求

// send any new fetches (won't resend pending fetches)

fetcher.sendFetches();

long now = time.milliseconds();

long pollTimeout = Math.min(coordinator.timeToNextPoll(now), timeout);

// 等待fetch請求結果

client.poll(pollTimeout, now, new PollCondition() {

@Override

public boolean shouldBlock() {

// since a fetch might be completed by the background thread, we need this poll condition

// to ensure that we do not block unnecessarily in poll()

return !fetcher.hasCompletedFetches();

}

});

// after the long poll, we should check whether the group needs to rebalance

// prior to returning data so that the group can stabilize faster

if (coordinator.needRejoin())

return Collections.emptyMap();

// 返回fetch結果

return fetcher.fetchedRecords();

}


分享到:


相關文章: