Kafka Data Replication(副本策略)

<strong>1.消息傳遞同步策略

Producer在發佈消息到某個Partition時,先通過ZooKeeper找到該Partition的Leader,然後無論該Topic的Replication Factor為多少,Producer只將該消息發送到該Partition的Leader。Leader會將該消息寫入其本地Log。每個Follower都從Leader pull數據。這種方式上,Follower存儲的數據順序與Leader保持一致。Follower在收到該消息並寫入其Log後,向Leader發送ACK。一旦Leader收到了ISR中的所有Replica的ACK,該消息就被認為已經commit了,Leader將增加HW並且向Producer發送ACK。

為了提高性能,每個Follower在接收到數據後就立馬向Leader發送ACK,而非等到數據寫入Log中。因此,對於已經commit的消息,Kafka只能保證它被存於多個Replica的內存中,而不能保證它們被持久化到磁盤中,也就不能完全保證異常發生後該條消息一定能被Consumer消費。

Consumer讀消息也是從Leader讀取,只有被commit過的消息才會暴露給Consumer。

Kafka Replication的數據流如下圖所示:

Kafka Data Replication(副本策略)

<strong>2 ACK前需要保證有多少個備份

對於Kafka而言,定義一個Broker是否“活著”包含兩個條件:

  • 一是它必須維護與ZooKeeper的session(這個通過ZooKeeper的Heartbeat機制來實現)。
  • 二是Follower必須能夠及時將Leader的消息複製過來,不能“落後太多”。

Leader會跟蹤與其保持同步的Replica列表,該列表稱為ISR(即in-sync Replica)。如果一個Follower宕機,或者落後太多,Leader將把它從ISR中移除。這裡所描述的“落後太多”指Follower複製的消息落後於Leader後的條數超過預定值(該值可在$KAFKA_HOME/config/server.properties中通過replica.lag.max.messages配置,其默認值是4000)或者Follower超過一定時間(該值可在$KAFKA_HOME/config/server.properties中通過replica.lag.time.max.ms來配置,其默認值是10000)未向Leader發送fetch請求。

Kafka的複製機制既不是完全的同步複製,也不是單純的異步複製。事實上,完全同步複製要求所有能工作的Follower都複製完,這條消息才會被認為commit,這種複製方式極大的影響了吞吐率(高吞吐率是Kafka非常重要的一個特性)。而異步複製方式下,Follower異步的從Leader複製數據,數據只要被Leader寫入log就被認為已經commit,這種情況下如果Follower都複製完都落後於Leader,而如果Leader突然宕機,則會丟失數據。而Kafka的這種使用ISR的方式則很好的均衡了確保數據不丟失以及吞吐率。Follower可以批量的從Leader複製數據,這樣極大的提高複製性能(批量寫磁盤),極大減少了Follower與Leader的差距。

需要說明的是,Kafka只解決fail/recover,不處理“Byzantine”(“拜占庭”)問題。一條消息只有被ISR裡的所有Follower都從Leader複製過去才會被認為已提交。這樣就避免了部分數據被寫進了Leader,還沒來得及被任何Follower複製就宕機了,而造成數據丟失(Consumer無法消費這些數據)。而對於Producer而言,它可以選擇是否等待消息commit,這可以通過request.required.acks來設置。這種機制確保了只要ISR有一個或以上的Follower,一條被commit的消息就不會丟失。

<strong>3.Leader Election算法

Leader選舉本質上是一個分佈式鎖,有兩種方式實現基於ZooKeeper的分佈式鎖:

  • 節點名稱唯一性:多個客戶端創建一個節點,只有成功創建節點的客戶端才能獲得鎖
  • 臨時順序節點:所有客戶端在某個目錄下創建自己的臨時順序節點,只有序號最小的才獲得鎖

一種非常常用的選舉leader的方式是“Majority Vote”(“少數服從多數”),但Kafka並未採用這種方式。這種模式下,如果我們有2f+1個Replica(包含Leader和Follower),那在commit之前必須保證有f+1個Replica複製完消息,為了保證正確選出新的Leader,fail的Replica不能超過f個。因為在剩下的任意f+1個Replica裡,至少有一個Replica包含有最新的所有消息。這種方式有個很大的優勢,系統的latency只取決於最快的幾個Broker,而非最慢那個。Majority Vote也有一些劣勢,為了保證Leader Election的正常進行,它所能容忍的fail的follower個數比較少。如果要容忍1個follower掛掉,必須要有3個以上的Replica,如果要容忍2個Follower掛掉,必須要有5個以上的Replica。也就是說,在生產環境下為了保證較高的容錯程度,必須要有大量的Replica,而大量的Replica又會在大數據量下導致性能的急劇下降。這就是這種算法更多用在ZooKeeper這種共享集群配置的系統中而很少在需要存儲大量數據的系統中使用的原因。例如HDFS的HA Feature是基於majority-vote-based journal,但是它的數據存儲並沒有使用這種方式。

Kafka在ZooKeeper中動態維護了一個ISR(in-sync replicas),這個ISR裡的所有Replica都跟上了leader,只有ISR裡的成員才有被選為Leader的可能。在這種模式下,對於f+1個Replica,一個Partition能在保證不丟失已經commit的消息的前提下容忍f個Replica的失敗。在大多數使用場景中,這種模式是非常有利的。事實上,為了容忍f個Replica的失敗,Majority Vote和ISR在commit前需要等待的Replica數量是一樣的,但是ISR需要的總的Replica的個數幾乎是Majority Vote的一半。

雖然Majority Vote與ISR相比有不需等待最慢的Broker這一優勢,但是Kafka作者認為Kafka可以通過Producer選擇是否被commit阻塞來改善這一問題,並且節省下來的Replica和磁盤使得ISR模式仍然值得。

<strong>4.如何處理所有Replica都不工作

在ISR中至少有一個follower時,Kafka可以確保已經commit的數據不丟失,但如果某個Partition的所有Replica都宕機了,就無法保證數據不丟失了。這種情況下有兩種可行的方案:

1.等待ISR中的任一個Replica“活”過來,並且選它作為Leader

2.選擇第一個“活”過來的Replica(不一定是ISR中的)作為Leader

這就需要在可用性和一致性當中作出一個簡單的折衷。如果一定要等待ISR中的Replica“活”過來,那不可用的時間就可能會相對較長。而且如果ISR中的所有Replica都無法“活”過來了,或者數據都丟失了,這個Partition將永遠不可用。選擇第一個“活”過來的Replica作為Leader,而這個Replica不是ISR中的Replica,那即使它並不保證已經包含了所有已commit的消息,它也會成為Leader而作為consumer的數據源(前文有說明,所有讀寫都由Leader完成)。Kafka0.8.*使用了第二種方式。根據Kafka的文檔,在以後的版本中,Kafka支持用戶通過配置選擇這兩種方式中的一種,從而根據不同的使用場景選擇高可用性還是強一致性。

<strong>5 選舉Leader

最簡單最直觀的方案是,所有Follower都在ZooKeeper上設置一個Watch,一旦Leader宕機,其對應的ephemeral znode會自動刪除,此時所有Follower都嘗試創建該節點,而創建成功者(ZooKeeper保證只有一個能創建成功)即是新的Leader,其它Replica即為Follower。

但是該方法會有3個問題:

1.split-brain 這是由ZooKeeper的特性引起的,雖然ZooKeeper能保證所有Watch按順序觸發,但並不能保證同一時刻所有Replica“看”到的狀態是一樣的,這就可能造成不同Replica的響應不一致

2.herd effect 如果宕機的那個Broker上的Partition比較多,會造成多個Watch被觸發,造成集群內大量的調整

3.ZooKeeper負載過重 每個Replica都要為此在ZooKeeper上註冊一個Watch,當集群規模增加到幾千個Partition時ZooKeeper負載會過重。

Kafka 0.8.*的Leader Election方案解決了上述問題,它在所有broker中選出一個controller,所有Partition的Leader選舉都由controller決定。controller會將Leader的改變直接通過RPC的方式(比ZooKeeper Queue的方式更高效)通知需為為此作為響應的Broker。同時controller也負責增刪Topic以及Replica的重新分配。


分享到:


相關文章: