Kafka 如何做到 1 秒發佈百萬條消息

Kafka 是分佈式發佈-訂閱消息系統,是一個分佈式的,可劃分的,冗餘備份的持久性的日誌服務。它主要用於處理活躍的流式數據。

現在被廣泛地應用於構建實時數據管道和流應用的場景中,具有橫向擴展,容錯,快等優點,並已經運行在眾多大中型公司的生產環境中,成功應用於大數據領域,本文分享一下我所瞭解的 Kafka。

Kafka 高吞吐率性能揭秘

Kafka 的第一個突出特定就是“快”,而且是那種變態的“快”,在普通廉價的虛擬機器上,比如一般 SAS 盤做的虛擬機上,據 LINDEDIN 統計,最新的數據是每天利用 Kafka 處理的消息超過1萬億條,在峰值時每秒鐘會發布超過百萬條消息,就算是在內存和 CPU 都不高的情況下,Kafka 的速度最高可以達到每秒十萬條數據,並且還能持久化存儲。

作為消息隊列,要承接讀跟寫兩塊的功能,首先是寫,就是消息日誌寫入 Kafka,那麼,Kafka 在“寫”上是怎麼做到寫變態快呢?

Kafka 讓代碼飛起來之寫得快

首先,可以使用 Kafka 提供的生產端 API 發佈消息到 1 個或多個 Topic(主題)的一個(保證數據的順序)或者多個分區(並行處理,但不一定保證數據順序)。Topic 可以簡單理解成一個數據類別,是用來區分不同數據的。

Kafka 維護一個 Topic 中的分區 log,以順序追加的方式向各個分區中寫入消息,每個分區都是不可變的消息隊列。分區中的消息都是以 k-v 形式存在。

  • k 表示 offset,稱之為偏移量,一個 64 位整型的唯一標識,offset 代表了 Topic 分區中所有消息流中該消息的起始字節位置。
  • v 就是實際的消息內容,每個分區中的每個 offset 都是唯一存在的,所有分區的消息都是一次寫入,在消息未過期之前都可以調整 offset 來實現多次讀取。


以上提到 Kafka “快”的第一個因素:消息順序寫入磁盤。

我們知道現在的磁盤大多數都還是機械結構(SSD不在討論的範圍內),如果將消息以隨機寫的方式存入磁盤,就會按柱面、磁頭、扇區的方式進行(尋址過程),緩慢的機械運動(相對內存)會消耗大量時間,導致磁盤的寫入速度只能達到內存寫入速度的幾百萬分之一,為了規避隨機寫帶來的時間消耗,KAFKA採取順序寫的方式存儲數據,如下圖所示:


Kafka 如何做到 1 秒發佈百萬條消息


新來的消息只能追加到已有消息的末尾,並且已經生產的消息不支持隨機刪除以及隨機訪問,但是消費者可以通過重置 offset 的方式來訪問已經消費過的數據。

即使順序讀寫,過於頻繁的大量小 I/O 操作一樣會造成磁盤的瓶頸,所以 Kafka 在此處的處理是把這些消息集合在一起批量發送,這樣減少對磁盤 IO 的過度讀寫,而不是一次發送單個消息。

另一個是無效率的字節複製,尤其是在負載比較高的情況下影響是顯著的。為了避免這種情況,Kafka 採用由 Producer,broker 和 consumer 共享的標準化二進制消息格式,這樣數據塊就可以在它們之間自由傳輸,無需轉換,降低了字節複製的成本開銷。

同時,Kafka 採用了 MMAP(Memory Mapped Files,內存映射文件)技術。很多現代操作系統都大量使用主存做磁盤緩存,一個現代操作系統可以將內存中的所有剩餘空間用作磁盤緩存,而當內存回收的時候幾乎沒有性能損失。

由於 Kafka 是基於 JVM 的,並且任何與 Java 內存使用打過交道的人都知道兩件事:

▪ 對象的內存開銷非常高,通常是實際要存儲數據大小的兩倍;

▪ 隨著數據的增加,java的垃圾收集也會越來越頻繁並且緩慢。

基於此,使用文件系統,同時依賴頁面緩存就比使用其他數據結構和維護內存緩存更有吸引力:

▪ 不使用進程內緩存,就騰出了內存空間,可以用來存放頁面緩存的空間幾乎可以翻倍。

▪ 如果 Kafka 重啟,進行內緩存就會丟失,但是使用操作系統的頁面緩存依然可以繼續使用。

可能有人會問 Kafka 如此頻繁利用頁面緩存,如果內存大小不夠了怎麼辦?

Kafka 會將數據寫入到持久化日誌中而不是刷新到磁盤。實際上它只是轉移到了內核的頁面緩存。

利用文件系統並且依靠頁緩存比維護一個內存緩存或者其他結構要好,它可以直接利用操作系統的頁緩存來實現文件到物理內存的直接映射。完成映射之後對物理內存的操作在適當時候會被同步到硬盤上。

Kafka 讓代碼飛起來之讀得快

Kafka 除了接收數據時寫得快,另外一個特點就是推送數據時發得快。

Kafka 這種消息隊列在生產端和消費端分別採取的 push 和 pull 的方式,也就是你生產端可以認為 Kafka 是個無底洞,有多少數據可以使勁往裡面推送,消費端則是根據自己的消費能力,需要多少數據,你自己過來 Kafka 這裡拉取,Kafka 能保證只要這裡有數據,消費端需要多少,都儘可以自己過來拿。

▲零拷貝

具體到消息的落地保存,broker 維護的消息日誌本身就是文件的目錄,每個文件都是二進制保存,生產者和消費者使用相同的格式來處理。維護這個公共的格式並允許優化最重要的操作:網絡傳輸持久性日誌塊。 現代的 unix 操作系統提供一個優化的代碼路徑,用於將數據從頁緩存傳輸到 socket;在 Linux 中,是通過 sendfile 系統調用來完成的。Java 提供了訪問這個系統調用的方法:FileChannel.transferTo API。

要理解 senfile 的影響,重要的是要了解將數據從文件傳輸到 socket 的公共數據路徑,如下圖所示,數據從磁盤傳輸到 socket 要經過以下幾個步驟:

▪ 操作系統將數據從磁盤讀入到內核空間的頁緩存

▪ 應用程序將數據從內核空間讀入到用戶空間緩存中

▪ 應用程序將數據寫回到內核空間到 socket 緩存中

▪ 操作系統將數據從 socket 緩衝區複製到網卡緩衝區,以便將數據經網絡發出

這裡有四次拷貝,兩次系統調用,這是非常低效的做法。如果使用 sendfile,只需要一次拷貝就行:允許操作系統將數據直接從頁緩存發送到網絡上。所以在這個優化的路徑中,只有最後一步將數據拷貝到網卡緩存中是需要的。

常規文件傳輸和 zeroCopy 方式的性能對比:

Kafka 如何做到 1 秒發佈百萬條消息


假設一個 Topic 有多個消費者的情況, 並使用上面的零拷貝優化,數據被複制到頁緩存中一次,並在每個消費上重複使用,而不是存儲在存儲器中,也不在每次讀取時複製到用戶空間。這使得以接近網絡連接限制的速度消費消息。

這種頁緩存和 sendfile 組合,意味著 Kafka 集群的消費者大多數都完全從緩存消費消息,而磁盤沒有任何讀取活動。

▲批量壓縮

在很多情況下,系統的瓶頸不是 CPU 或磁盤,而是網絡帶寬,對於需要在廣域網上的數據中心之間發送消息的數據流水線尤其如此。所以數據壓縮就很重要。可以每個消息都壓縮,但是壓縮率相對很低。所以 Kafka 使用了批量壓縮,即將多個消息一起壓縮而不是單個消息壓縮。

Kafka 允許使用遞歸的消息集合,批量的消息可以通過壓縮的形式傳輸並且在日誌中也可以保持壓縮格式,直到被消費者解壓縮。

Kafka 支持 Gzip 和 Snappy 壓縮協議。

Kafka 數據可靠性深度解讀


Kafka 如何做到 1 秒發佈百萬條消息


Kafka 的消息保存在 Topic 中,Topic 可分為多個分區,為保證數據的安全性,每個分區又有多個 Replia。

▪ 多分區的設計的特點:

  1. 為了併發讀寫,加快讀寫速度;
  2. 是利用多分區的存儲,利於數據的均衡;
  3. 是為了加快數據的恢復速率,一但某臺機器掛了,整個集群只需要恢復一部分數據,可加快故障恢復的時間。

每個 Partition 分為多個 Segment,每個 Segment 有 .log 和 .index 兩個文件,每個 log 文件承載具體的數據,每條消息都有一個遞增的 offset,Index 文件是對 log 文件的索引,Consumer 查找 offset 時使用的是二分法根據文件名去定位到哪個 Segment,然後解析 msg,匹配到對應的 offset 的 msg。

每個 Partition 會在磁盤記錄一個 RecoveryPoint,,記錄已經 flush 到磁盤的最大 offset。當 broker 失敗重啟時,會進行 loadLogs。首先會讀取該 Partition 的 RecoveryPoint,找到包含 RecoveryPoint 的 segment 及以後的 segment, 這些 segment 就是可能沒有完全 flush 到磁盤 segments。然後調用 segment 的 recover,重新讀取各個 segment 的 msg,並重建索引。每次重啟 Kafka 的 broker 時,都可以在輸出的日誌看到重建各個索引的過程。

< 數據同步>

Producer 和 Consumer 都只與 Leader 交互,每個 Follower 從 Leader 拉取數據進行同步。

Kafka 如何做到 1 秒發佈百萬條消息


如上圖所示,ISR 是所有不落後的 replica 集合,不落後有兩層含義:距離上次 FetchRequest 的時間不大於某一個值或落後的消息數不大於某一個值,Leader失 敗後會從 ISR 中隨機選取一個 Follower 做 Leader,該過程對用戶是透明的。

當 Producer 向 Broker 發送數據時,可以通過 request.required.acks 參數設置數據可靠性的級別。

此配置是表明當一次 Producer 請求被認為完成時的確認值。特別是,多少個其他 brokers 必須已經提交了數據到它們的 log 並且向它們的 Leader 確認了這些信息。

▪典型的值:

0: 表示 Producer 從來不等待來自 broker 的確認信息。這個選擇提供了最小的時延但同時風險最大(因為當server宕機時,數據將會丟失)。

1:表示獲得 Leader replica 已經接收了數據的確認信息。這個選擇時延較小同時確保了 server 確認接收成功。

-1:Producer 會獲得所有同步 replicas 都收到數據的確認。同時時延最大,然而,這種方式並沒有完全消除丟失消息的風險,因為同步 replicas 的數量可能是 1。如果你想確保某些 replicas 接收到數據,那麼你應該在 Topic-level 設置中選項 min.insync.replicas 設置一下。

僅設置 acks= -1 也不能保證數據不丟失,當 ISR 列表中只有 Leader 時,同樣有可能造成數據丟失。要保證數據不丟除了設置 acks=-1,還要保證 ISR 的大小大於等於2。

▪具體參數設置:

request.required.acks:設置為 -1 等待所有 ISR 列表中的 Replica 接收到消息後採算寫成功。

min.insync.replicas:設置為 >=2,保證 ISR 中至少兩個 Replica。

Producer:要在吞吐率和數據可靠性之間做一個權衡。

Kafka 作為現代消息中間件中的佼佼者,以其速度和高可靠性贏得了廣大市場和用戶青睞,其中的很多設計理念都是非常值得我們學習的,本文所介紹的也只是冰山一角,希望能夠對大家瞭解 Kafka 有一定的作用。


分享到:


相關文章: