Kafka丟消息的處理

Kafka丟消息的處理

Kafka存在丟消息的問題,消息丟失會發生在Broker,Producer和Consumer三種。

Broker

Broker丟失消息是由於Kafka本身的原因造成的,kafka為了得到更高的性能和吞吐量,將數據異步批量的存儲在磁盤中。消息的刷盤過程,為了提高性能,減少刷盤次數,kafka採用了批量刷盤的做法。即,按照一定的消息量,和時間間隔進行刷盤。這種機制也是由於linux操作系統決定的。將數據存儲到linux操作系統種,會先存儲到頁緩存(Page cache)中,按照時間或者其他條件進行刷盤(從page cache到file),或者通過fsync命令強制刷盤。數據在page cache中時,如果系統掛掉,數據會丟失。

Kafka丟消息的處理

Broker在linux服務器上高速讀寫以及同步到Replica

上圖簡述了broker寫數據以及同步的一個過程。broker寫數據只寫到PageCache中,而pageCache位於內存。這部分數據在斷電後是會丟失的。pageCache的數據通過linux的flusher程序進行刷盤。刷盤觸發條件有三:

  • 主動調用sync或fsync函數
  • 可用內存低於閥值
  • dirty data時間達到閥值。dirty是pagecache的一個標識位,當有數據寫入到pageCache時,pagecache被標註為dirty,數據刷盤以後,dirty標誌清除。

Broker配置刷盤機制,是通過調用fsync函數接管了刷盤動作。從單個Broker來看,pageCache的數據會丟失。

Kafka沒有提供同步刷盤的方式。同步刷盤在RocketMQ中有實現,實現原理是將異步刷盤的流程進行阻塞,等待響應,類似ajax的callback或者是java的future。下面是一段rocketmq的源碼。

<code>GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());

service.putRequest(request);

boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); // 刷盤/<code>

為了解決該問題,kafka通過producer和broker協同處理單個broker丟失參數的情況。一旦producer發現broker消息丟失,即可自動進行retry。除非retry次數超過閥值(可配置),消息才會丟失。此時需要生產者客戶端手動處理該情況。那麼producer是如何檢測到數據丟失的呢?是通過ack機制,類似於http的三次握手的方式。

  • acks=0,producer不等待broker的響應,效率最高,但是消息很可能會丟。
  • acks=1,leader broker收到消息後,不等待其他follower的響應,即返回ack。也可以理解為ack數為1。此時,如果follower還沒有收到leader同步的消息leader就掛了,那麼消息會丟失。按照上圖中的例子,如果leader收到消息,成功寫入PageCache後,會返回ack,此時producer認為消息發送成功。但此時,按照上圖,數據還沒有被同步到follower。如果此時leader斷電,數據會丟失。
  • acks=-1,leader broker收到消息後,掛起,等待所有ISR列表中的follower返回結果後,再返回ack。-1等效與all。這種配置下,只有leader寫入數據到pagecache是不會返回ack的,還需要所有的ISR返回“成功”才會觸發ack。如果此時斷電,producer可以知道消息沒有被髮送成功,將會重新發送。如果在follower收到數據以後,成功返回ack,leader斷電,數據將存在於原來的follower中。在重新選舉以後,新的leader會持有該部分數據。數據從leader同步到follower,需要2步:數據從pageCache被刷盤到disk。因為只有disk中的數據才能被同步到replica。數據同步到replica,並且replica成功將數據寫入PageCache。在producer得到ack後,哪怕是所有機器都停電,數據也至少會存在於leader的磁盤內。

上面第三點提到了ISR的列表的follower,需要配合另一個參數才能更好的保證ack的有效性。ISR是Broker維護的一個“可靠的follower列表”,in-sync Replica列表,broker的配置包含一個參數:min.insync.replicas。該參數表示ISR中最少的副本數。如果不設置該值,ISR中的follower列表可能為空。此時相當於acks=1。

Kafka丟消息的處理

如上圖中:

  • acks=0,總耗時f(t) = f(1)。
  • acks=1,總耗時f(t) = f(1) + f(2)。
  • acks=-1,總耗時f(t) = f(1) + max( f(A) , f(B) ) + f(2)。

性能依次遞減,可靠性依次升高。

Producer

Producer丟失消息,發生在生產者客戶端。

為了提升效率,減少IO,producer在發送數據時可以將多個請求進行合併後發送。被合併的請求咋發送一線緩存在本地buffer中。緩存的方式和前文提到的刷盤類似,producer可以將請求打包成“塊”或者按照時間間隔,將buffer中的數據發出。通過buffer我們可以將生產者改造為異步的方式,而這可以提升我們的發送效率。

但是,buffer中的數據就是危險的。在正常情況下,客戶端的異步調用可以通過callback來處理消息發送失敗或者超時的情況,但是,一旦producer被非法的停止了,那麼buffer中的數據將丟失,broker將無法收到該部分數據。又或者,當Producer客戶端內存不夠時,如果採取的策略是丟棄消息(另一種策略是block阻塞),消息也會被丟失。抑或,消息產生(異步產生)過快,導致掛起線程過多,內存不足,導致程序崩潰,消息丟失。

Kafka丟消息的處理

producer採取批量發送的示意圖

Kafka丟消息的處理

異步發送消息生產速度過快的示意圖

根據上圖,可以想到幾個解決的思路:

  • 異步發送消息改為同步發送消。或者service產生消息時,使用阻塞的線程池,並且線程數有一定上限。整體思路是控制消息產生速度。
  • 擴大Buffer的容量配置。這種方式可以緩解該情況的出現,但不能杜絕。
  • service不直接將消息發送到buffer(內存),而是將消息寫到本地的磁盤中(數據庫或者文件),由另一個(或少量)生產線程進行消息發送。相當於是在buffer和service之間又加了一層空間更加富裕的緩衝層。

Consumer

Consumer消費消息有下面幾個步驟:

  1. 接收消息
  2. 處理消息
  3. 反饋“處理完畢”(commited)

Consumer的消費方式主要分為兩種:

  • 自動提交offset,Automatic Offset Committing
  • 手動提交offset,Manual Offset Control

Consumer自動提交的機制是根據一定的時間間隔,將收到的消息進行commit。commit過程和消費消息的過程是異步的。也就是說,可能存在消費過程未成功(比如拋出異常),commit消息已經提交了。此時消息就丟失了。

<code>Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
// 自動提交開關
props.put("enable.auto.commit", "true");
// 自動提交的時間間隔,此處是1s
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<string> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
// 調用poll後,1000ms後,消息狀態會被改為 committed
ConsumerRecords<string> records = consumer.poll(100);
for (ConsumerRecord<string> record : records)
insertIntoDB(record); // 將消息入庫,時間可能會超過1000ms
}/<string>/<string>/<string>/<code>

上面的示例是自動提交的例子。如果此時,`insertIntoDB(record)`發生異常,消息將會出現丟失。接下來是手動提交的例子:

<code>Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
// 關閉自動提交,改為手動提交
props.put("enable.auto.commit", "false");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<string> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
final int minBatchSize = 200;
List<consumerrecord>> buffer = new ArrayList<>();
while (true) {
// 調用poll後,不會進行auto commit
ConsumerRecords<string> records = consumer.poll(100);
for (ConsumerRecord<string> record : records) {
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {

insertIntoDb(buffer);
// 所有消息消費完畢以後,才進行commit操作
consumer.commitSync();
buffer.clear();
}
}/<string>/<string>/<consumerrecord>/<string>/<code>

將提交類型改為手動以後,可以保證消息“至少被消費一次”(at least once)。但此時可能出現重複消費的情況,重複消費不屬於本篇討論範圍。

上面兩個例子,是直接使用Consumer的High level API,客戶端對於offset等控制是透明的。也可以採用Low level API的方式,手動控制offset,也可以保證消息不丟,不過會更加複雜。

<code>try {
while(running) {
ConsumerRecords<string> records = consumer.poll(Long.MAX_VALUE);
for (TopicPartition partition : records.partitions()) {
List<consumerrecord>> partitionRecords = records.records(partition);
for (ConsumerRecord<string> record : partitionRecords) {
System.out.println(record.offset() + ": " + record.value());
}
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
// 精確控制offset
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}
}
} finally {
consumer.close();
}/<string>/<consumerrecord>/<string>/<code>


分享到:


相關文章: