眾所周知 Flink 是當前廣泛使用的計算引擎,Flink 使用 checkpoint 機制進行容錯處理[1],Flink 的 checkpoint 會將狀態快照備份到分佈式存儲系統,供後續恢復使用。在 Alibaba 內部我們使用的存儲主要是 HDFS,當同一個集群的 Job 到達一定數量後,會對 HDFS 造成非常大的壓力,本文將介紹一種大幅度降低 HDFS 壓力的方法 -- 小文件合併。
背景
不管使用 FsStateBackend、RocksDBStateBackend 還是 NiagaraStateBackend,Flink 在進行 checkpoint 的時候,TM 會將狀態快照寫到分佈式文件系統中,然後將文件句柄發給 JM,JM 完成全局 checkpoint 快照的存儲,如下圖所示。
對於全量 checkpoint 來說,TM 將每個 checkpoint 內部的數據都寫到同一個文件,而對於 RocksDBStateBackend/NiagaraStateBackend 的增量 checkpoint [2]來說,則會將每個 sst 文件寫到一個分佈式系統的文件內。當作業量很大,且作業的併發很大時,則會對底層 HDFS 形成非常大的壓力:1)大量的 RPC 請求會影響 RPC 的響應時間(如下圖所示);2)大量文件對 NameNode 內存造成很大壓力。
在 Flink 中曾經嘗試使用 ByteStreamStateHandle 來解決小文件多的問題[3],將小於一定閾值的 state 直接發送到 JM,由 JM 統一寫到分佈式文件中,從而避免在 TM 端生成小文件。但是這個方案有一定的侷限性,閾值設置太小,還會有很多小文件生成,閾值設置太大,則會導致 JM 內存消耗太多有 OOM 的風險。
1 小文件合併方案
針對上面的問題我們提出一種解決方案 -- 小文件合併。
在原來的實現中,每個 sst 文件會打開一個
CheckpointOutputStream,每個 CheckpointOutputStream 對應一個 FSDataOutputStream,將本地文件寫往一個分佈式文件,然後關閉 FSDataOutputStream,生成一個 StateHandle。如下圖所示:
小文件合併則會重用打開的 FSDataOutputStream,直至文件大小達到預設的閾值為止,換句話說多個 sst 文件會重用同一個 DFS 上的文件,每個 sst 文件佔用 DFS 文件中的一部分,最終多個 StateHandle 共用一個物理文件,如下圖所示。
在接下來的章節中我們會描述實現的細節,其中需要重點考慮的地方包括:
- 併發 checkpoint 的支持
Flink 天生支持併發 checkpoint,小文件合併方案則會將多個文件寫往同一個分佈式存儲文件中,如果考慮不當,數據會寫串或者損壞,因此我們需要有一種機制保證該方案的正確性,詳細描述參考 2.1 節 - 防止誤刪文件
我們使用引用計數來記錄文件的使用情況,僅通過文件引用計數是否降為 0 進行判斷刪除,則可能誤刪文件,如何保證文件不會被錯誤刪除,我們將會在 2.2 節進行闡述 - 降低空間放大
使用小文件合併之後,只要文件中還有一個 statehandle 被使用,整個分佈式文件就不能被刪除,因此會佔用更多的空間,我們在 2.3 節描述瞭解決該問題的詳細方案 - 異常處理
我們將在 2.4 節闡述如何處理異常情況,包括 JM 異常和 TM 異常的情況 - 2.5 節中會詳細描述在 Checkpoint 被取消或者失敗後,如何取消 TM 端的 Snapshot,如果不取消 TM 端的 Snapshot,則會導致 TM 端實際運行的 Snapshot 比正常的多
在第 3 節中闡述了小文件合併方案與現有方案的兼容性;第 4 節則會描述小文件合併方案的優勢和不足;最後在第 5 節我們展示在生產環境下取得的效果。
2 設計實現
本節中我們會詳細描述整個小文件合併的細節,以及其中的設計要點。
這裡我們大致回憶一下 TM 端 Snapshot 的過程
- TM 端 barrier 對齊
- TM Snapshot 同步操作
- TM Snapshot 異步操作
其中上傳 sst 文件到分佈式存儲系統在上面的第三步,同一個 checkpoint 內的文件順序上傳,多個 checkpoint 的文件上傳可能同時進行。
2.1 併發 checkpoint 支持
Flink 天生支持併發 checkpoint,因此小文件合併方案也需要能夠支持併發 checkpoint,如果不同 checkpoint 的 sst 文件同時寫往一個分佈式文件,則會導致文件內容損壞,後續無法從該文件進行 restore。
在 FLINK-11937[4] 的提案中,我們會將每個 checkpoint 的 state 文件寫到同一個 HDFS 文件,不同 checkpoint 的 state 寫到不同的 HDFS 文件 -- 換句話說,HDFS 文件不跨 Checkpoint 共用,從而避免了多個客戶端同時寫入同一個文件的情況。
後續我們會繼續推進跨 Checkpoint 共用文件的方案,當然在跨 Checkpoint 共用文件的方案中,並行的 Checkpoint 也會寫往不同的 HDFS 文件。
2.2 防止誤刪文件
複用底層文件之後,我們使用引用計數追蹤文件的使用情況,在文件引用數降為 0 的情況下刪除文件。但是在某些情況下,文件引用數為 0 的時候,並不代表文件不會被繼續使用,可能導致文件誤刪。下面我們會詳>細描述開啟併發 checkpoint 後可能導致文件誤刪的情況,以及解決方案。
我們以下圖為例,maxConcurrentlyCheckpoint = 2
上圖中共有 3 個 checkpoint,其中 chk-1 已經完成,chk-2 和 chk-3 都基於 chk-1 進行,chk-2 在 chk-3 前完成,chk-3 在註冊 4.sst 的時候發現,發現 4.sst 在 chk-2 中已經註冊過,會重用 chk-2 中 4.sst 對應的 stateHandle,然後取消 chk-3 中的 4.sst 的註冊,並且刪除 stateHandle,在處理完 chk-3 中 4.sst 之後,該 stateHandle 對應的分佈式文件的引用計數為 0,如果我們這個時候刪除分佈式文件,則會同時刪除 5.sst 對應的內容,導致後續無法從 chk-3 恢復。
這裡的問題是如何在 stateHandle 對應的分佈式文件引用計數降為 0 的時候正確判斷是否還會繼續引用該文件,因此在整個 checkpoint 完成處理之後再判斷某個分佈式文件能否刪除,如果真個 checkpoint 完成發現文件沒有被引用,則可以安全刪除,否則不進行刪除。
2.3 降低空間放大
使用小文件合併方案後,每個 sst 文件對應分佈式文件中的一個 segment,如下圖所示
文件僅能在所有 segment 都不再使用時進行刪除,上圖中有 4 個 segment,僅 segment-4 被使用,但是整個文件都不能刪除,其中 segment[1-3] 的空間被浪費掉了,從實際生產環境中的數據可知,整體的空間放大率(實際佔用的空間 / 真實有用的空間)在 1.3 - 1.6 之間。
為了解決空間放大的問題,在 TM 端起異步線程對放大率超過閾值的文件進行壓縮。而且僅對已經關閉的文件進行壓縮。
整個壓縮的流程如下所示:
- 計算每個文件的放大率
- 如果放大率較小則直接跳到步驟 7
- 如果文件 A 的放大率超過閾值,則生成一個對應的新文件 A‘(如果這個過程中創建文件失敗,則由 TM 負責清理工作)
- 記錄 A 與 A’ 的映射關係
- 在下一次 checkpoint X 往 JM 發送落在文件 A 中的 StateHandle 時,則使用 A` 中的信息生成一個新的 StateHandle 發送給 JM
- checkpoint X 完成後,我們增加 A‘ 的引用計數,減少 A 的引用計數,在引用計數降為 0 後將文件 A 刪除(如果 JM 增加了 A’ 的引用,然後出現異常,則會從上次成功的 checkpoint 重新構建整個引用計數器)
- 文件壓縮完成
2.4 異常情況處理
在 checkpoint 的過程中,主要有兩種異常:JM 異常和 TM 異常,我們將分情況闡述。
2.4.1 JM 異常
JM 端主要記錄 StateHandle 以及文件的引用計數,引用計數相關數據不需要持久化到外存中,因此不需要特殊的處理,也不需要考慮 transaction 等相關操作,如果 JM 發送 failover,則可以直接從最近一次 complete checkpoint 恢復,並重建引用計數即可。
2.4.2 TM 異常
TM 異常可以分為兩種:1)該文件在之前 checkpoint 中已經彙報過給 JM;2)文件尚未彙報過給 JM,我們會分情況闡述。
- 文件已經彙報過給 JM
文件彙報過給 JM,因此在 JM 端有文件的引用計數,文件的刪除由 JM 控制,當文件的引用計數變為 0 之後,JM 將刪除該文件。 - 文件尚未彙報給 JM
該文件暫時尚未彙報過給 JM,該文件不再被使用,也不會被 JM 感知,成為孤兒文件。這種情況暫時有外圍工具統一進行清理。
2.5 取消 TM 端 snapshot
像前面章節所說,我們需要在 checkpoint 超時/失敗時,取消 TM 端的 snapshot,而 Flink 則沒有相應的通知機制,現在 FLINK-8871[5] 在追蹤相應的優化,我們在內部增加了相關實現,當 checkpoint 失敗時會發送 RPC 數據給 TM,TM 端接受到相應的 RPC 消息後,會取消相應的 snapshot。
3 兼容性
小文件合併功能支持從之前的版本無縫遷移過來。從之前的 checkpoint restore 的的步驟如下:
- 每個 TM 分到自己需要 restore 的 state handle
- TM 從遠程下載 state handle 對應的數據
- 從本地進行恢復
小文件合併主要影響的是第 2 步,從遠程下載對應數據的時候對不同的 StateHandle 進行適配,因此不影響整體的兼容性。
4 優勢和不足
- 優勢:大幅度降低 HDFS 的壓力:包括 RPC 壓力以及 NameNode 內存的壓力
- 不足:不支持 State 多線程上傳的功能(State 上傳暫時不是 checkpoint 的瓶頸)
5 線上環境的結果
在該方案上線後,對 Namenode 的壓力大幅降低,下面的截圖來自線上生產集群,從數據來看,文件創建和關閉的 RPC 有明顯下降,RPC 的響應時間也有大幅度降低,確保順利度過雙十一。
閱讀更多 阿里云云棲號 的文章