面試官:說說你如何保證Kafka不丟失消息?

kafka如何保證不丟消息

ps:這篇文章自我感覺說的很大白話了!希望你們看過了之後能有收穫。

生產者丟失消息的情況

生產者(Producer) 調用send方法發送消息之後,消息可能因為網絡問題並沒有發送過去。

所以,我們不能默認在調用send方法發送消息之後消息消息發送成功了。為了確定消息是發送成功,我們要判斷消息發送的結果。但是要注意的是 Kafka 生產者(Producer) 使用 send 方法發送消息實際上是異步的操作,我們可以通過 get()方法獲取調用結果,但是這樣也讓它變為了同步操作,示例代碼如下:

詳細代碼見我的這篇文章:Kafka系列第三篇!10 分鐘學會如何在 Spring Boot 程序中使用 Kafka 作為消息隊列?

<code>SendResult<string> sendResult = kafkaTemplate.send(topic, o).get();
if (sendResult.getRecordMetadata() != null) {
logger.info("生產者成功發送消息到" + sendResult.getProducerRecord().topic() + "-> " + sendRe
sult.getProducerRecord().value().toString());
}/<string>/<code>

但是一般不推薦這麼做!可以採用為其添加回調函數的形式,示例代碼如下:

<code>        ListenableFuture<sendresult>> future = kafkaTemplate.send(topic, o);
future.addCallback(result -> logger.info("生產者成功發送消息到topic:{} partition:{}的消息", result.getRecordMetadata().topic(), result.getRecordMetadata().partition()),
ex -> logger.error("生產者發送消失敗,原因:{}", ex.getMessage()));/<sendresult>/<code>

如果消息發送失敗的話,我們檢查失敗的原因之後重新發送即可!

另外這裡推薦為 Producer 的retries (重試次數)設置一個比較合理的值,一般是 3 ,但是為了保證消息不丟失的話一般會設置比較大一點。設置完成之後,當出現網絡問題之後能夠自動重試消息發送,避免消息丟失。另外,建議還要設置重試間隔,因為間隔太小的話重試的效果就不明顯了,網絡波動一次你3次一下子就重試完了

消費者丟失消息的情況

我們知道消息在被追加到 Partition(分區)的時候都會分配一個特定的偏移量(offset)。偏移量(offset)表示 Consumer 當前消費到的 Partition(分區)的所在的位置。Kafka 通過偏移量(offset)可以保證消息在分區內的順序性。

面試官:說說你如何保證Kafka不丟失消息?

當消費者拉取到了分區的某個消息之後,消費者會自動提交了 offset。自動提交的話會有一個問題,試想一下,當消費者剛拿到這個消息準備進行真正消費的時候,突然掛掉了,消息實際上並沒有被消費,但是 offset 卻被自動提交了。

解決辦法也比較粗暴,我們手動關閉閉自動提交 offset,每次在真正消費完消息之後之後再自己手動提交 offset 。 但是,細心的朋友一定會發現,這樣會帶來消息被重新消費的問題。比如你剛剛消費完消息之後,還沒提交 offset,結果自己掛掉了,那麼這個消息理論上就會被消費兩次。

Kafka 弄丟了消息

我們知道 Kafka 為分區(Partition)引入了多副本(Replica)機制。分區(Partition)中的多個副本之間會有一個叫做 leader 的傢伙,其他副本稱為 follower。我們發送的消息會被髮送到 leader 副本,然後 follower 副本才能從 leader 副本中拉取消息進行同步。生產者和消費者只與 leader 副本交互。你可以理解為其他副本只是 leader 副本的拷貝,它們的存在只是為了保證消息存儲的安全性。

試想一種情況:假如 leader 副本所在的 broker 突然掛掉,那麼就要從 follower 副本重新選出一個 leader ,但是 leader 的數據還有一些沒有被 follower 副本的同步的話,就會造成消息丟失。

設置 acks = all

解決辦法就是我們設置 acks = all。acks 是 Kafka 生產者(Producer) 很重要的一個參數。

acks 的默認值即為1,代表我們的消息被leader副本接收之後就算被成功發送。當我們配置 acks = all 代表則所有副本都要接收到該消息之後該消息才算真正成功被髮送。

設置 replication.factor >= 3

為了保證 leader 副本能有 follower 副本能同步消息,我們一般會為 topic 設置 replication.factor >= 3。這樣就可以保證每個 分區(partition) 至少有 3 個副本。雖然造成了數據冗餘,但是帶來了數據的安全性。

設置 min.insync.replicas > 1

一般情況下我們還需要設置 min.insync.replicas> 1 ,這樣配置代表消息至少要被寫入到 2 個副本才算是被成功發送。min.insync.replicas 的默認值為 1 ,在實際生產中應儘量避免默認值 1。

但是,為了保證整個 Kafka 服務的高可用性,你需要確保

replication.factor > min.insync.replicas 。為什麼呢?設想一下加入兩者相等的話,只要是有一個副本掛掉,整個分區就無法正常工作了。這明顯違反高可用性!一般推薦設置成 replication.factor = min.insync.replicas + 1

設置 unclean.leader.election.enable = false

Kafka 0.11.0.0版本開始 unclean.leader.election.enable 參數的默認值由原來的true 改為false

我們最開始也說了我們發送的消息會被髮送到 leader 副本,然後 follower 副本才能從 leader 副本中拉取消息進行同步。多個 follower 副本之間的消息同步情況不一樣,當我們配置了 unclean.leader.election.enable = false 的話,當 leader 副本發生故障時就不會從 follower 副本中和 leader 同步程度達不到要求的副本中選擇出 leader ,這樣降低了消息丟失的可能性。


分享到:


相關文章: