Twitter高性能分佈式日誌系統架構解析「轉」

為什麼需要分佈式日誌?



Twitter高性能分佈式日誌系統架構解析「轉」

日誌應該是程序員最熟悉的一種數據結構。它存在於大家每天的工作中。它是一組只追加,嚴格有序的記錄序列。它長得像上圖這個樣子。日誌已被證明是一種很有效的數據結構,可用來解決很多分佈式系統的問題。在 Twitter,我們就用日誌來解決很多有挑戰的分佈式系統問題。


這裡主要舉一個例子。我們如何使用日誌在 Manhattan(Twitter的最終一致性分佈式Key/Value數據庫)中實現 Compare-And-Set 這樣的強一致性操作。


Twitter高性能分佈式日誌系統架構解析「轉」


這是一張 Manhattan 架構的簡單抽象圖。Manhattan 主要由 3 個組件構成,client, co-ordinator 和 replicas。Client 將請求發送給 co-ordinator,co-ordinator 找出修改鍵值 (key) 所對應的 replicas。然後修改 replicas。Co-ordinator 在發送請求的時候會附上相應的時間戳,replica 根據時間戳來決定最後哪個修改成功,實現最終一致性。


Twitter高性能分佈式日誌系統架構解析「轉」

如果我們需要在這個最終一致性的系統上實現 CAS(Compare-And-Set) 這樣的強一致性操作,會碰到什麼樣的問題呢?衝突!


“衝突”是什麼意思呢?舉個例子,假設有兩個 Client,它們同時想要修改 key x,但修改成不同的結果。綠色的 Client 想將 x 從 3 修改到 4,而紅色的 Client想將 x 從 3 修改到 5。


Twitter高性能分佈式日誌系統架構解析「轉」

假設綠色的 Client 成功地將第一個副本從 3 修改到 4;而紅色的 Client 成功地將第三個副本從 3 修改到 5。那麼綠色的 Client 修改第三個副本將會失敗,因為第三個副本的值已經變成了 5。同樣,紅色的 Client 修改第一個副本也會失敗。


Twitter高性能分佈式日誌系統架構解析「轉」

這是之前提到的“衝突”。因為你不知道這個系統中,x 的最終值應該是 4 還是 5。或者其他值。更嚴重的是,系統無法從這個“衝突”狀態中恢復,也就沒有最終一致性可言。


Twitter高性能分佈式日誌系統架構解析「轉」


解決辦法是什麼呢?日誌!使用日誌來序列化所有的請求。使用日誌後的請求流程將變成如圖所示:co-ordinator 將請求寫到日誌中。所有的 replicas 從日誌中按順序讀取請求,並修改本地的狀態。


Twitter高性能分佈式日誌系統架構解析「轉」


在這個例子中,修改為 4 的操作在修改為 5 的操作之前寫入日誌。因此,所有的副本會首先被修改成 4。那麼修改為 5 的操作將會失敗。到此為止,你可以看出日誌的好處。它將一個原本複雜的問題變得簡單。這種解決問題的思路叫做 Pub/Sub。而日誌就是 Pub/Sub 模式的基礎。因為 Pub/Sub 這個模式是那麼簡單而且強有力,這讓我們思考,是不是可以構建一個高可用的分佈式日誌服務,所有在 Twitter 的分佈式系統都可以複用這個日誌服務?構建一個分佈式日誌系統,首要的事情就是找出我們需要解決什麼問題,滿足什麼樣的需求。


Twitter高性能分佈式日誌系統架構解析「轉」

首先作為一個基本設施,存儲在日誌中的數據需要持久化,這樣它可以容忍宕機,避免數據丟失。


Twitter高性能分佈式日誌系統架構解析「轉」

因為需要作為分佈式系統的基礎設施,那麼在單機上持久化是遠遠不夠的。我們需要將數據複製到多臺機器上,提高數據和系統的可用性。


Twitter高性能分佈式日誌系統架構解析「轉」

當數據被複制到多臺機器上的時候,我們就需要保證數據的強一致性。否則,如果我們出現丟數據、數據不一致,那麼勢必影響到構建在分佈式日誌上的所有系統。如果日誌都不能相信了,你的生活還能相信誰呢 :)


Twitter 如何考慮這個問題?

為什麼持久化 (durability)、多副本 (replication) 和強一致性 (consistency),對我們來說這麼重要呢?


Twitter高性能分佈式日誌系統架構解析「轉」

我所在 Twitter 的組,是 messaging 組。主要負責 Twitter 的消息中間件(在 Twitter 內部服務之間搬運數據),比如 Kestrel(用於在線系統)、Kafka(用於離線分析)。這些系統都不支持嚴格的持久化,或者在支持持久化的情況下性能極差。它們採用定期回刷 (periodic flush) 磁盤或者依賴於文件系統 (pdflush) 來持久化數據。它們因為不支持持久化,所以當事故發生時,我們會丟數據。一旦數據丟失,運維繫統的人就會非常痛苦。我們經常被責問,如何才能定量丟失的數據。這就讓我們不禁在想,是否能夠構建這樣一個基礎服務,它的基石就是持久化和強一致的?


Twitter高性能分佈式日誌系統架構解析「轉」

在持久化和強一致性的基礎上,它又是高性能的:可以支持低延時的在線系統(OLTP),比如數據庫,支持實時的 (real-time)、高吞吐的流式分析和高通量的批量離線分析。同時能夠很好地擴展,以支持構建在分佈式日誌之上的系統的擴展性。在深入之前,先強調一點:我所提到的 “low latency” 和 “high throughput” 在分佈式日誌系統中指什麼?


Twitter高性能分佈式日誌系統架構解析「轉」

日誌系統的核心負載可以歸為三類:writes,tailing reads 和 catch-up reads。Writes 就是將數據追加到一個日誌中,tailing reads 就是從日誌的尾部讀最新的東西,而 catch-up reads 則是從比較早的位置開始讀日誌(比如數據庫中重建副本)。


Twitter高性能分佈式日誌系統架構解析「轉」


Writes 和 tailing reads 在意的是延時 (latency),因為它關係到一個消息能多快地從被寫入到被讀到。而 catch-up reads 在意的則是高吞吐量,因為它關係到是否能追趕到日誌的尾部。在一個“完美”的世界中,系統應該只有兩種負載,writes 和 tailing reads。而且大部分現有系統對於這兩種負載可以很好地應付。但是,在現實世界裡,這基本不可能。尤其在一個多租戶的環境裡,catch-up reads 通常成為影響系統的重要因素。舉個例子,以流式計算為例,用戶可能重啟一個 topology。而這個 topology 可能從很早地位置開始大量讀數據,從而引入大量的 catch-up reads。而這些 catch-up reads 在文件系統角度通常會表現為大批量的掃描,文件系統會進行大量的預讀取到 Page Cache 裡,從而擠掉最新的數據而影響寫操作和 tailing read操作。在設計這個分佈式日誌系統 DistributedLog 的時候,我們進行了各種調研。也同時基於運維已有系統 (kestrel, Kafka) 的經驗,我們最終決定基於 Apache BookKeeper進行構建。主要因為 Apache BookKeeper 提供的三個核心特性:I/O 分離、並行複製和容易理解的一致性模型。它們能夠很好地滿足我們對於持久化、多副本和一致性的要求。在深入解釋 Apache BookKeeper 的這些核心特性之前,我先簡單地說明一下 Apache BookKeeper。

Twitter 如何基於 Apache BookKeeper 構建 DistributeLog?

Apache BookKeeper 最早開始於 2008 年,是 Yahoo 巴塞羅那研究院的研究項目,首要目的是解決 HDFS NameNode 的可用性問題。後來成為 Apache ZooKeeper 的子項目。2014 年底,脫離 Apache ZooKeeper 成為頂級項目。目前被 Yahoo, Twitter,Salesforce 等公司使用。


Twitter高性能分佈式日誌系統架構解析「轉」

這張圖簡單地描述了 Apache BookKeeper 的樣子。它主要由三個組件構成,客戶端 (client),數據存儲節點 (Bookie) 和元數據存儲 Service Discovery(ZooKeeper)。Bookies在啟動的時候向 ZooKeeper 註冊節點。Client 通過 ZooKeeper 發現可用的 Bookie。在 Apache BookKeeper 中,讀寫操作的單元叫做 Ledger。Ledger 是一組追加有序的記錄。客戶端可以創建一個 Ledger,然後進行追加寫操作。每個 Ledger 會被賦予全局唯一的 ID。讀者可以根據 Ledger ID,打開 Ledger 進行讀操作。


Twitter高性能分佈式日誌系統架構解析「轉」

客戶端在創建 Ledger 的時候,從 Bookie Pool 裡面按照指定的數據放置策略挑選出一定數量的 Bookie,構成一個 Ensemble。


Twitter高性能分佈式日誌系統架構解析「轉」

每條被追加的記錄在寫者(Writer)會被賦予從 0 開始有序遞增的序號,稱為 Entry ID。每條 Entry 會被並行地發送給 Ensemble 裡面的所有 Bookies。並且所有 Entry的發送以流水線的方式進行。也就是意味著發送第 N + 1 條記錄的寫請求不需要等待發送第 N 條記錄的寫請求返回。對於每條 Entry 的寫操作而言,當它收到 Ensemble 裡面大多數 Bookie 的確認後,Client 認為這條記錄已經持久化到這個 Ensemble 中,並且有大多數副本,它就可以返回確認給 Application。寫記錄的發送可以亂序,但是確認 (Acknowledge) 則會按照 Entry ID 的順序進行有序確認。從而實現日誌的嚴格有序性。


Twitter高性能分佈式日誌系統架構解析「轉」

如果 Ensemble 裡面的存活的 Bookies 不能構成大多數,Client 會進行一個 Ensemble Change。Ensemble Change 將從 Bookie Pool 中根據數據放置策略挑選出額外的 Bookie 用來取代那些不存活的 Bookie (圖中粉色方塊)。通過 Ensemble Change 操作,Apache BookKeeper 保證寫操作的高可用性。理解 Apache BookKeeper 的讀操作之前,需要先說明一下 Apache BookKeeper 的一致性模型。


Twitter高性能分佈式日誌系統架構解析「轉」

對於 Writer 而言,write 不斷地添加記錄。每個記錄會被 writer 賦予一個嚴格遞增的 ID。所有的追加操作都是異步的。也就是寫第二條記錄不用等寫第一條記錄返回。所有寫成功的操作按照 ID 遞增順序 Ack 回 writer。


Twitter高性能分佈式日誌系統架構解析「轉」

伴隨著寫成功的 Acknowledges,writer 不斷地更新一個指針叫做 Last-Add-Confirmed (LAC)。所有 Entry ID 小於等於 LAC 的記錄保證持久化並複製到大多數副本上。而在 LAC 和 LAP (Last-Add-Pushed) 之間的記錄就是已經發送到 Bookies 但是尚未被確認寫成功的。


Twitter高性能分佈式日誌系統架構解析「轉」

所有的 Readers 都可以安全地讀取 Entry ID 小於或者等於 LAC 的記錄,從而保證 reader 不會讀到尚未被確認 (acknowledged) 的記錄,從而保證了讀者之間的一致性。在寫者方面,BookKeeper 並不進行任何主動的選主 (leader election) 操作。相反地,它提供了內置的 fencing 機制,防止出現多個寫者的狀態,從而保證寫者的一致性。Apache BookKeeper 沒有將很複雜的一致性機制捆綁在一起。寫者和讀者之間也沒有很複雜的協同機制。所有的一致性的協調就是通過這個 LAC 指針 (Last Add Confirmed)。這樣的做法,可以使得擴展寫者和擴展讀者相互分離。


Twitter高性能分佈式日誌系統架構解析「轉」

理解了 Apache BookKeeper 的一致性模型之後,我們再回來看它的讀操作。在 Apache BookKeeper中,主要有兩種讀操作:一種是讀指定的 Entry(圖(a)),另外一種是讀 LAC (圖(b))。因為 Entry 追加之後不再被修改,那麼在圖 (a) 中,客戶端可以到任意一個副本讀取相應的 Entry。為了保證低延時(獲得平滑的 p999), 我們使用了一個叫 Speculative Read 的機制。讀請求首先發送給第一個副本,在指定 timeout 的時間內,如果沒有收到 reponse,則發送讀請求給第二個副本,然後同時等待第一個和第二個副本。誰第一個返回,即讀取成功。通過有效的 Speculative Read,我們很大程度減小了 p999 延時的 spikes,達到可預測的低延時。另一個操作是讀取 LAC。這是讀者跟寫者之間的 Catch-Up 操作,保證讀者讀取到最新的數據。因此,它是採用的是 Quorum Read 的做法:從所有 Bookies 讀取最新的 LAC,然後等待大多數的答覆。Read Entries 和 Read LAC 構成了Reader的核心操作。在 Twitter,為了進一步降低延時,我們將兩種操作進行合併,形成 “Long Poll Read”(圖(c)). 客戶端發送 Long Poll 請求並在 Bookie 等待最新的 LAC 的更新,一旦寫者更新了 LAC,Bookie 返回更新後的 LAC 以及相應的 Entry。這樣可以有效地節省多輪網絡交互。同時對於 Long Poll Read,我們仍然採用 Speculative 機制,保證平滑的的可預測的 p999 延時。這是 BookKeeper 主要的核心讀寫流程,並行複製和一致性模型。


Twitter高性能分佈式日誌系統架構解析「轉」

Apache BookKeeper 作為基石,解決了分佈式日誌的核心問題。但是,它還是相對比較底層。


Twitter高性能分佈式日誌系統架構解析「轉」

作為一個共享的日誌服務,要在大的組織架構中被廣泛使用,首要的問題就是簡單性。我們需要讓用戶思考的是命名的日誌,而不是一組數字編號的 Ledgers。日誌應該是一組無止盡的記錄序列,提供更面向流式的接口。用戶只需要考慮如何將數據追加到流裡,以及如何從流裡讀取數據。如圖所示,紫色是一個 DistributedLog 的日誌。日誌被切分成不同的日誌段,每個日誌段被存成一個 Apache BookKeeper 的 Ledger。


Twitter高性能分佈式日誌系統架構解析「轉」

日誌被切分成不同的日誌段,每個日誌段被存成一個 Apache BookKeeper 的 Ledger。新的日誌段在一定時間或者舊的日誌段寫滿時會被創建。因為日誌被切割成基本上相近大小的日誌段,所以很容易將這些日誌分段分散到整個集群中,實現數據的均勻分佈。因為數據持續追加到日誌中,我們提供兩種方式刪除日誌。一種是精確的 truncation,對於數據庫這樣嚴格的複製狀態機(replicated state machines)的應用場景,它們需要嚴格控制哪個位置之前的數據是不再需要的。另外一個是基於時間的自動過期,它適用於不需要嚴格控制的數據分析場景。


Twitter高性能分佈式日誌系統架構解析「轉」

除了核心的抽象,我們要構建一個服務。這個服務如上圖所示。在寫入端,我們加了一個服務叫“Write Proxy”,用來接收來自於不同源的寫入服務。它負責管理每個日誌的 ownership,並且在有 proxy server 宕機的情況下 failover 到其他 proxy server。需要強調的一點,在這裡使用的是 “ownership tracking” 而不是 “leadership election”,我們不需要像 consensus 算法那樣嚴格的 leadership 要求,因為 Apache BookKeeper 提供了內置的 fencing 機制保證多寫者的一致性。所以此時的 “Write Proxy” 更像是一個無狀態的服務 “stateless service”,可以隨時遷移和 failover。在讀方面,我們增加了一個服務叫做 “Read Proxy”。它用來緩存最近的數據,可以用來支持成百上千的 readers 讀取相同一個日誌。Write proxy 和 Read proxy 都是無狀態的服務。所以可以很容易地運行在像 Mesos,Docker 或者 Amazon EC2 這樣的集群環境中,實現 Auto-Scaling。同時使用這樣的分層架構,我們可以輕易地獨立地擴展服務層和存儲層。


Twitter高性能分佈式日誌系統架構解析「轉」


這就是 DistributedLog,Twitter 基於 BookKeeper 構建的分佈式日誌服務。它包含了我們認為作為日誌系統需要的核心功能,我們認為足以滿足支持不同的負載,從事務性的在線服務,實時的流式分析到離線的批處理。


Twitter高性能分佈式日誌系統架構解析「轉」


而對於其他特性,比如如何 partition 數據,如何 route 數據到不同的日誌,如何記錄每個 reader 的讀取位置,我們交由上層應用程序處理。不同的應用程序對於延時、一致性和有序性都有不同的需求。只要基礎設施是持久化、強一致性和嚴格有序的,那麼就很容易去支持所有其他應用。

DistributeLog 案例分享

我們已經運行 DistributedLog/BookKeeper 有三四年了。在上面的服務包括:Manhattan 數據庫,EventBus (我們自服務的 pubsub system,用於取代 Kafka),跨數據中心的數據庫複製,Twitter 搜索的 ingestion pipeline,持久化的 Deferred RPC 系統,用於存儲系統的 Sharding Service…我們現在正在全面取代已有的老系統 Kestrel (用於在線服務的 Queue)和 Kakfa(用於離線分析的Pub/Sub)。因為時間有限,我主要講解了 DistributedLog 和 BookKeeper 的大概。中間跳過了一些內容。我們相信 DistributedLog 是一個相對不錯的模塊化的架構。它適用於基於 Cloud Services (e.g Amazon EC2, Docker) 的公司,也適用於擁有自己數據中心,運行自己集群系統(e.g Mesos, Yarn) 的公司。我們計劃第一季度開源 DistributedLog 到 Apache 社區。


Twitter高性能分佈式日誌系統架構解析「轉」


DistributedLog 的架構可以運行在多機房,實現跨機房的強一致性。

Q & A

1、日誌順序方面,日誌的序列號(1,2,3,4……),是否使用了 Twitter 的 snowflake 服務?獲取序列號後再推送日誌?是上面提到的什麼組件做的?

沒有使用 Twitter 的 snowflake 服務。因為 Writer 是 single writer,存在 ownership。所有的寫會 forward 給 owner 進行序列化。


2、這是 Kafka 的替代產品嗎?

是的。Kafka 目前沒有被使用在數據庫日誌的場景。因為 Kafka 的每個 topic 對應一個文件,在 topic 數量特別多,且需要持久化的場景,Kafka 的性能比較差。很難適用於 Twitter 的多租戶場景。


3、請問是否研究過 ELK,請問在前面分享的架構中,哪個對應 ELK 中的 Logstash(或fluentd)部分?或是 BookKeeper 就是替換它的?

這裡的日誌就是數據庫的日誌。跟日常的文本日誌不一樣。在 ELK 架構中,E 是文本的索引,K 是 UI。這兩個部分不是 DistributedLog/BookKeeper 所解決的問題。DistributedLog/BookKeeper 可以作為 PUB/SUB 這樣的消息中間件來做日誌的中轉,也就是可以用在 L 的部分。


4、分享中提到的 Kestrel 和 Kafka 一個在線 ,一個離線,具體差異是什麼?

Kestrel 主要是 producer/consumer queue 的模型。而 Kafka 是 pub/sub 模型。Kestrel 支持 per item 的 transaction,粒度是 item。而 Kafka 的粒度是 partition。


5、Name Log 的具體機制是什麼樣的? Client 刪除日誌時怎樣保證與讀者和寫者不衝突?

Name Log 是 DistributedLog 提供的用戶接口。底層分塊成不同的 Ledgers 進行存儲。元數據記錄在 ZooKeeper。使用 ZooKeeper 的 CAS 操作和 notification 機制來協調。6、想多瞭解一下跨數據中心複製,感覺不好做。可否介紹一下?

這個問題比較寬泛。跨數據中心,可以是異步複製,也可以是同步複製。不同場景有不同的權衡。


7、如果 LAC 之後的那條記錄始終不能寫成功,是不是就阻塞在那裡,LAC 就沒法移動了?

這是一個很好的問題。Ensemble Change 能夠保證寫永遠 go through。所以 LAC 會被 update 到 bookies。讀方面的 Speculative 機制保證能讀到 LAC。


8、 這裡的 writer 是 Write Proxy 嗎?如果是的話,single writer 的吞吐量就是這個 ledger 的最大寫的吞吐量了吧,會不會成為瓶頸?

這裡的 Writer 是指 Write Proxy。首先,一個 Ledger 的吞吐量,取決於 Bookie 的磁盤/網絡帶寬。假設,Bookie 的網卡是 1Gbps,一塊磁盤作為日誌寫的磁盤,那麼在保證低延時的情況下,Bookie 的吞吐可以達到 50MB/s~70MB/s。在 BookKeeper,可以通過配置 Ledger 的 Ensemble Size, Write Quorum Size 和 Ack Quorum Size,通過 Stripping 寫的方式來提高 Ledger 的吞吐。比如,設置 Ensemble Size 為 6, Write Quorum Size 為 3, Ack Quorum Size 為 2。那麼吞吐量可以提高到 2 倍。這是 Ledger 內的 Scalability。

理論上,單個 Ledger 的吞吐可以隨著 Ensemble Size 進行擴展。但是,因為所有這個 Ledger 都 writes 都要到 Write Proxy,所以它還取決於 Write Proxy 的網絡帶寬和後端 Bookie 的磁盤帶寬,以及相應的副本數量。比如,Write Proxy 的網卡帶寬是 1Gbps,複本為 3,即使後端的 Bookie 的吞吐可以達到 50MB/s~70MBps,Write Proxy 也只能接受 1Gbps / 3 (~30MB/s-~40MB/s) 的數據。單個日誌的吞吐通常取決於物理機器的帶寬。但是整個系統的吞吐可以隨著日誌數量的增加來增加。比如 1 個日誌可以寫 10MB / s,那麼 100 個日誌可以寫 1GB/s。在 DistributedLog 層,我們不做 partition。我們把 partition 的 logic 交給上層應用。因為不同應用對於如何 partition 有不同需求。

9、 Failover 到其他 Proxy Server 時,如何繼續產生遞增的 Entry ID?在 failover 到其他 Proxy Server 時,DistributedLog 並不會複用上一個 Proxy Server 的 ledger。所以 failover 之後,它會關閉上個 Proxy Server 寫的 ledger,然後重新開一個 ledger 進行寫入。遞增的 Entry ID 是基於當前 ledger 生成的。從整個日誌的角度來看,<ledger> 構成了 unique 的記錄 ID。如果對於 consensus 算法有所瞭解,可能會知道 `epoch` 的概念。每個 epoch 會有一個 designated 的 leader。而在 DistributedLog 中,`ledger id` 其實扮演著 `epoch` 的概念。/<ledger>



原文地址:https://mp.weixin.qq.com/s?__biz=MzAwMDU1MTE1OQ==&mid=403051208&idx=1&sn=1694ac05acbcb5ca53c88bfac8a68856&scene=2&srcid=1224xZuQ9QQ4sRmiPVdHTppL&from=timeline&isappinstalled=0#wechat_redirect


分享到:


相關文章: