kafka的編程模型

1.kafka消費者編程模型

分區消費模型

組(group)消費模型

1.1.分區消費模型

1.1.1.分區消費架構圖,每個分區對應一個消費者。

kafka的編程模型

1.1.2.分區消費模型偽代碼描述

kafka的編程模型

指定偏移量,用於從上次消費的地方開始消費.

提交offset ,java客戶端會自動提交的集群,所以這一步可選。

1.2.組(group)消費模型

1.2.1.組消費模型架構圖

每個組都消費該topic的全量數據,一條消息會發給groupA和groupB.

kafka的編程模型

1.2.2.組消費模型偽代碼:

kafka的編程模型

流數N:表示一個consumer組裡面有幾個consumer 實例,上例中組A創建2個流,組B創建4個流。

1.2.3.consumer分配算法

當kafka的分區個數大於組A裡consumer實例個數時,怎麼去分配,以下為分配步驟:

kafka的編程模型

1.3.兩種消費模型對比

Partition消費模型更加靈活但是:

(1)需要自己處理各種異常情況;

(2)需要自己管理offset(以實現消息傳遞的其他語義);

Group消費模型更加簡單,但是不靈活:

(1)不需要自己處理異常情況,不需要自己管理offset;

(2)只能實現kafka默認的最少一次消息傳遞語義;

知識補充:消息傳遞的3中語義:

至少一次,(消息不會丟,消息者至少得到一次,但有可能會重複,生產者向消費者發送之後,會等待消費者確認,沒收到確認會再發) (kafka 默認實現的語義)。

至多一次,(消息會丟)

有且只有一次。

1.4.java 客戶端參數調優

fetchSize: 從服務器獲取單包大小;

bufferSize: kafka客戶端緩衝區大小;

group.id: 分組消費時分組名 (指定的每個組將獲得全量的數據)

2.生產者消費模型

同步生產模型

異步生產模型

2.1. 同步生產模型

至少成功一次 , 發送給kafka消費者

kafka的編程模型

2.2.異步生產模型

打包發送給kafka broker。

kafka的編程模型

2.3.兩種生產模型偽代碼描述

main()

創建到kafka broker的連接:KafkaClient(host,port)

選擇或者自定義生產者負載均衡算法 partitioner (算法有:hash,輪詢,隨機)

設置生產者參數 (緩存隊列長度,發送時間,同步/異步參數設置)

根據負載均衡算法和設置的生產者參數構造Producer對象

while True

getMessage:從上游獲得一條消息

按照kafka要求的消息格式構造kafka消息

根據分區算法得到分區

發送消息

處理異常

2.4.兩種生產模型對比

同步生產模型:

(1)低消息丟失率;

(2)高消息重複率(由於網絡原因,回覆確認未收到);

(3)高延遲 (每發一條消息需要確認)

(使用在不丟消息場景)

異步生產模型:

(1)低延遲;

(2)高發送性能;(每秒一個分區發50萬條)

(3)高消息丟失率(無確認機制,發送端隊列滿了,消息會丟掉;整個隊列發送給)

(使用在允許丟消息場景,偶爾丟一條)

2.5.java客戶端代碼實現 (自定義分區)

//同步配置參數:

默認的序列化方式:字節序列化。

設定分區算法:默認是對key進行hash分區算法,可以自定義分區算法。

確認機制 request.require.acks: 合理設置為1; 0: 絕不等確認 1: leader的一個副本收到這條消息,併發回確認 -1: leader的所有副本都收到這條消息,併發回確認

消息是以key-value的形式發送的,key必須要設置。

2.6.java客戶端參數調優

message.send.max.retries: 發送失敗重試次數;

retry.backoff.ms :未接到確認,認為發送失敗的時間;

producer.type: 同步發送或者異步發送;

batch.num.messages: 異步發送時,累計最大消息數;

queue.buffering.max.ms:異步發送時,累計最大時間;


分享到:


相關文章: