kafka基於ZK消費者模型分析

消費者客戶端設計基本思想

a),同一個Partition不會同時被一個以上的消費者消費,更詳細的說:

b),同一個Partition只能被一個消費組中的一個消費者消費

c),一個消費者可以消費多個Partition

d),一個Partition只會指定給一個消費者

e),每一個消費組共享所有Partition,也就是同一個消費組下消費者對Partition是互斥的,而不同消費組之間是共享的

f),一個Partition只會分配給一個消費者線程

簡單闡述下kafka的發佈訂閱和隊列模型

a),發佈訂閱模型:同一條數據會會被所有的消費組獲取,每個消費組只有一個消費者可以消費,前提每個消費組只有一個消費者

b),隊列模型:同上,如果一個消費組有多個消費者,同時只有一個消費組訂閱該主題則實現實現單播

消費組模型如圖所示:(官方圖片)

kafka基於ZK消費者模型分析

基於ZK設計流程

ZK作為去中心化的集群模式,在消費者模型解決的核心問題是需要消費者知道哪些生產者是可用的,即在kafka中消費者需要知道集群中哪些節點和哪些分片可用

首先需要存儲數據在ZK中,消費進度在使用ZK作為協調者的時候,消費進度會定時的保存在ZK上,在獲取新的Partition時候新的消費者都會從ZK中獲取數據進行

數據消費除此之外消費組的成員列表,主題和Partition也保存在ZK中,ZK會在消費者組節點下注冊消費者子節點。

啟動流程如下:

消費者在啟動時就需要指定消費組和需要依賴的ZK集群,連接ZK後需要獲取分配的Partition,然後創建對應的Partition消息流讀取數據

ZK會監聽Partition的變化,消費組的變化和會話超時信息,觸發新的事件進行Partition的重新分配。

負責消費Partition的每個消費者都是一個消費進程,該進程也可以創建多個線程消費分區數據,ConcurrentMessageListenerContainer為多線程消息監聽類

例如:一個消費者在同一個主題下創建三個個線程消費同一個主題的三個Partition和三個消費者(每個消費者一個消費線程)消費該主題的最終結果是一樣的。

每一個消費者都會針對每個主題創建多個線程,每一個線程對應一個隊列和消息流

一個線程容許被分配給多個Partition,多個Partition會共用同一個隊列和消息流。

每個消費者在啟動的時候都會訂閱三種事件,如上所說:會話超時,消費組變化和主題變化事件:

關於消費者和Partition變化後如何實現消費的再平衡,有個基本思路,消費組中的消費者發生變化如退出或者加入某一消費組,則該消費組已分配Partition的

數量和節點需要進行重新分配,但消費進度會進行傳遞,避免重複消費,即消費組成員變化引起所有消費者發生Rebalance,且消費者在Rebalance前後分配到的Partition

會完全不同,消費者和Partition的再分配主要由ZKREBALANCELISTENER類負責處理,詳細流程如下:

a),停止拉取線程防止數據重複

b),分配之前會先刪除原有ZK上保存的相關信息,如果沒有進行刪除操作則很有可能遇到同一Partition被多個消費者消費造成數據消費混亂

c),為所有Partition重新分配消費者

d),在分配Partition成功後則啟動創建拉取線程

其中關鍵點闡述如下:

a),每個消費者只有在獲取到Partition後才能拉取數據,才知道從哪裡拉取數據,在拉取之前還需要知道消費進度,即讀取的Partition偏移量,Partition信息對象會根據

從ZK中拉取的Partition,隊列和偏移量數據進行對象的構造,Partition信息被用到拉取線程中,這樣才能在分區被重新分配後保證各自消費的消息平滑遷移和過渡

b),拉取線程將數據填充到隊列後,消息流方可以從隊列中迭代出數據用於消費者消費,拉取線程也會針對同一主題多個節點的多個Partition進行網絡優化和請求合併

但不完全等同於生產者線程優化策略。

c),拉取線程在備份副本的使用上和消費者拉取管理器上的使用上有所不同:

消費者的使用上主要採用阻塞方式,並放入對應的消費隊列

而針對備份副本主要是為了做數據同步,目的不同,則採用異步的方式進行數據的拉取,存儲方式採用的和主節點保持一致

d),消息被放入隊列中以數據塊的方式進行存儲,一個數據塊對應一個Partition的消息集合

總結

基於ZK監聽器流程為高級API採用的方式,即0.9之前的設計流程,0.9之後使用Java對該架構進行了一次重構,主要變化在減少了SCALE和ZK的依賴,使代碼工程更加簡潔,核心改變

在於使用了kafka自己的分組協調機制來代替ZK的監聽器機制

啟動方式由原來的指定ZK集群替換成需要指定kafka集群,這樣在觸發分配分區和提交分區偏移量的時候,就發送給kafka協調者進行處理,提交分區偏移量則只會發送給協調者,後續會做更詳細的解析


分享到:


相關文章: