Flink Checkpoint機制原理剖析與參數配置

在 這篇文章中,我們介紹了Flink的狀態都是基於本地的,而Flink又是一個部署在多節點的分佈式引擎,分佈式系統經常出現進程被殺、節點宕機或網絡中斷等問題,那麼本地的狀態在遇到故障時如何保證不丟呢?Flink定期保存狀態數據到存儲上,故障發生後從之前的備份中恢復,整個被稱為Checkpoint機制,它為Flink提供了Exactly-Once的投遞保障。本文將介紹Flink的Checkpoint機制的原理。本文會使用多個概念:快照(Snapshot)、分佈式快照(Distributed Snapshot)、檢查點(Checkpoint)等,這些概念均指的是Flink的Checkpoint機制,讀者可以將這些概念等同看待。

Flink分佈式快照流程

首先我們來看一下一個簡單的Checkpoint的大致流程:

  1. 暫停處理新流入數據,將新數據緩存起來。
  2. 將算子子任務的本地狀態數據拷貝到一個遠程的持久化存儲上。
  3. 繼續處理新流入的數據,包括剛才緩存起來的數據。

Flink是在Chandy–Lamport算法[1]的基礎上實現的一種分佈式快照算法。在介紹Flink的快照詳細流程前,我們先要了解一下檢查點分界線(Checkpoint Barrier)的概念。如下圖所示,Checkpoint Barrier被插入到數據流中,它將數據流切分成段。Flink的Checkpoint邏輯是,一段新數據流入導致狀態發生了變化,Flink的算子接收到Checpoint Barrier後,對狀態進行快照。每個Checkpoint Barrier有一個ID,表示該段數據屬於哪次Checkpoint。如圖所示,當ID為n的Checkpoint Barrier到達每個算子後,表示要對n-1和n之間狀態的更新做快照。Checkpoint Barrier有點像Event Time中的Watermark,它被插入到數據流中,但並不影響數據流原有的處理順序。

Flink Checkpoint機制原理剖析與參數配置

接下來,我們構建一個並行數據流圖,用這個並行數據流圖來演示Flink的分佈式快照機制。這個數據流圖有兩個Source子任務,數據流會在這些並行算子上從Source流動到Sink。

Flink Checkpoint機制原理剖析與參數配置

首先,Flink的檢查點協調器(Checkpoint Coordinator)觸發一次Checkpoint(Trigger Checkpoint),這個請求會發送給Source的各個子任務。

Flink Checkpoint機制原理剖析與參數配置

各Source算子子任務接收到這個Checkpoint請求之後,會將自己的狀態寫入到狀態後端,生成一次快照,並且會向下遊廣播Checkpoint Barrier。

Flink Checkpoint機制原理剖析與參數配置

Source算子做完快照後,還會給Checkpoint Coodinator發送一個確認,告知自己已經做完了相應的工作。這個確認中包括了一些元數據,其中就包括剛才備份到State Backend的狀態句柄,或者說是指向狀態的指針。至此,Source完成了一次Checkpoint。跟Watermark的傳播一樣,一個算子子任務要把Checkpoint Barrier發送給所連接的所有下游算子子任務。

對於下游算子來說,可能有多個與之相連的上游輸入,我們將算子之間的邊稱為通道。Source要將一個ID為n的Checkpoint Barrier向所有下游算子廣播,這也意味著下游算子的多個輸入裡都有同一個Checkpoint Barrier,而且不同輸入裡Checkpoint Barrier的流入進度可能不同。Checkpoint Barrier傳播的過程需要進行對齊(Barrier Alignment),我們從數據流圖中截取一小部分來分析Checkpoint Barrier是如何在算子間傳播和對齊的。

Flink Checkpoint機制原理剖析與參數配置


如上圖所示,對齊分為四步:

  1. 算子子任務在某個輸入通道中收到第一個ID為n的Checkpoint Barrier,但是其他輸入通道中ID為n的Checkpoint Barrier還未到達,該算子子任務開始準備進行對齊。
  2. 算子子任務將第一個輸入通道的數據緩存下來,同時繼續處理其他輸入通道的數據,這個過程被稱為對齊。
  3. 第二個輸入通道的Checkpoint Barrier抵達該算子子任務,該算子子任務執行快照,將狀態寫入State Backend,然後將ID為n的Checkpoint Barrier向下遊所有輸出通道廣播。
  4. 對於這個算子子任務,快照執行結束,繼續處理各個通道中新流入數據,包括剛才緩存起來的數據。

數據流圖中的每個算子子任務都要完成一遍上述的對齊、快照、確認的工作,當最後所有Sink算子確認完成快照之後,說明ID為n的Checkpoint執行結束,Checkpoint Coordinator向State Backend寫入一些本次Checkpoint的元數據。

Flink Checkpoint機制原理剖析與參數配置

之所以要進行對齊,主要是為了保證一個Flink作業所有算子的狀態是一致的。也就是說,某個ID為n的Checkpoint Barrier從前到後流入所有算子子任務後,所有算子子任務都能將同樣的一段數據寫入快照。

快照性能優化方案

前面和大家分享了一致性快照的具體流程,這種方式保證了數據的一致性,但有一些潛在的問題:

  1. 每次進行Checkpoint前,都需要暫停處理新流入數據,然後開始執行快照,假如狀態比較大,一次快照可能長達幾秒甚至幾分鐘。
  2. Checkpoint Barrier對齊時,必須等待所有上游通道都處理完,假如某個上游通道處理很慢,這可能造成整個數據流堵塞。

針對這些問題Flink已經有了一些解決方案,並且還在不斷優化。

對於第一個問題,Flink提供了異步快照(Asynchronous Snapshot)的機制。當實際執行快照時,Flink可以立即向下廣播Checkpoint Barrier,表示自己已經執行完自己部分的快照。同時,Flink啟動一個後臺線程,它創建本地狀態的一份拷貝,這個線程用來將本地狀態的拷貝同步到State Backend上,一旦數據同步完成,再給Checkpoint Coordinator發送確認信息。拷貝一份數據肯定佔用更多內存,這時可以利用寫入時複製(Copy-on-Write)的優化策略。Copy-on-Write指:如果這份內存數據沒有任何修改,那沒必要生成一份拷貝,只需要有一個指向這份數據的指針,通過指針將本地數據同步到State Backend上;如果這份內存數據有一些更新,那再去申請額外的內存空間並維護兩份數據,一份是快照時的數據,一份是更新後的數據。

對於第二個問題,Flink允許跳過對齊這一步,或者說一個算子子任務不需要等待所有上游通道的Checkpoint Barrier,直接將Checkpoint Barrier廣播,執行快照並繼續處理後續流入數據。為了保證數據一致性,Flink必須將那些較慢的數據流中的元素也一起快照,一旦重啟,這些元素會被重新處理一遍。

State Backend

前面已經分享了Flink的快照機制,其中State Backend起到了持久化存儲數據的重要功能。Flink將State Backend抽象成了一種插件,並提供了三種State Backend,每種State Backend對數據的保存和恢復方式略有不同。接下來我們開始詳細瞭解一下Flink的State Backend。

MemoryStateBackend

從名字中可以看出,這種State Backend主要基於內存,它將數據存儲在Java的堆區。當進行分佈式快照時,所有算子子任務將自己內存上的狀態同步到JobManager的堆上,一個作業的所有狀態要小於JobManager的內存大小。這種方式顯然不能存儲過大的狀態數據,否則將拋出OutOfMemoryError異常。因此,這種方式只適合調試或者實驗,不建議在生產環境下使用。下面的代碼告知一個Flink作業使用內存作為State Backend,並在參數中指定了狀態的最大值,默認情況下,這個最大值是5MB。

env.setStateBackend(new MemoryStateBackend(MAX_MEM_STATE_SIZE)) 

如果不做任何配置,默認情況是使用內存作為State Backend。

FsStateBackend

這種方式下,數據持久化到文件系統上,文件系統包括本地磁盤、HDFS以及包括Amazon、阿里雲在內的雲存儲服務。使用時,我們要提供文件系統的地址,尤其要寫明前綴,比如:file://、hdfs://或s3://。此外,這種方式支持Asynchronous Snapshot,默認情況下這個功能是開啟的,可加快數據同步速度。

// 使用HDFS作為State Backendenv.setStateBackend(new FsStateBackend("hdfs://namenode:port/flink-checkpoints/chk-17/"))​// 使用阿里雲OSS作為State Backendenv.setStateBackend(new FsStateBackend("oss:///"))​// 使用Amazon作為State Backendenv.setStateBackend(new FsStateBackend("s3:///"))​// 關閉Asynchronous Snapshotenv.setStateBackend(new FsStateBackend(checkpointPath, false))

Flink的本地狀態仍然在TaskManager的內存堆區上,直到執行快照時狀態數據會寫到所配置的文件系統上。因此,這種方式能夠享受本地內存的快速讀寫訪問,也能保證大容量狀態作業的故障恢復能力。

RocksDBStateBackend

這種方式下,本地狀態存儲在本地的RocksDB上。RocksDB是一種嵌入式Key-Value數據庫,數據實際保存在本地磁盤上。比起FsStateBackend的本地狀態存儲在內存中,RocksDB利用了磁盤空間,所以可存儲的本地狀態更大。然而,每次從RocksDB中讀寫數據都需要進行序列化和反序列化,因此讀寫本地狀態的成本更高。快照執行時,Flink將存儲於本地RocksDB的狀態同步到遠程的存儲上,因此使用這種State Backend時,也要配置分佈式存儲的地址。Asynchronous Snapshot在默認情況也是開啟的。

此外,這種State Backend允許增量快照(Incremental Checkpoint),Incremental Checkpoint的核心思想是每次快照時只對發生變化的數據增量寫到分佈式存儲上,而不是將所有的本地狀態都拷貝過去。Incremental Checkpoint非常適合超大規模的狀態,快照的耗時將明顯降低,同時,它的代價是重啟恢復的時間更長。默認情況下,Incremental Checkpoint沒有開啟,需要我們手動開啟。

// 開啟Incremental Checkpointval enableIncrementalCheckpointing = trueenv.setStateBackend(new RocksDBStateBackend(checkpointPath, enableIncrementalCheckpointing))

相比FsStateBackend,RocksDBStateBackend能夠支持的本地和遠程狀態都更大,Flink社區已經有TB級的案例。

除了上述三種之外,開發者也可以自行開發State Backend的具體實現。

重啟恢復流程

Flink的重啟恢復邏輯相對比較簡單:

  1. 重啟應用,在集群上重新部署數據流圖。
  2. 從持久化存儲上讀取最近一次的Checkpoint數據,加載到個算子子任務上。
  3. 繼續處理新流入的數據。

這樣的機制可以保證Flink內部狀態的Excatly-Once一致性。至於端到端的Exactly-Once一致性,要根據Source和Sink的具體實現而定。當發生故障時,一部分數據有可能已經流入系統,但還未進行Checkpoint,Source的Checkpoint記錄了輸入的Offset;當重啟時,Flink能把最近一次的Checkpoint恢復到內存中,並根據Offset,讓Source從該位置重新發送一遍數據,以保證數據不丟不重。像Kafka等消息隊列是提供重發功能的,socketTextStream就不具有這種功能,也意味著不能保證Exactly-Once投遞保障。

Checkpoint相關配置

默認情況下,Checkpoint機制是關閉的,需要調用env.enableCheckpointing(n)來開啟,每隔n毫秒進行一次Checkpoint。Checkpoint是一種負載較重的任務,如果狀態比較大,同時n值又比較小,那可能一次Checkpoint還沒完成,下次Checkpoint已經被觸發,佔用太多本該用於正常數據處理的資源。增大n值意味著一個作業的Checkpoint次數更少,整個作業用於進行Checkpoint的資源更小,可以將更多的資源用於正常的流數據處理。同時,更大的n值意味著重啟後,整個作業需要從更長的Offset開始重新處理數據。

此外,還有一些其他參數需要配置,這些參數統一封裝在了CheckpointConfig裡:

val cpConfig: CheckpointConfig = env.getCheckpointConfig

默認的Checkpoint配置是支持Exactly-Once投遞的,這樣能保證在重啟恢復時,所有算子的狀態對任一條數據只處理一次。用上文的Checkpoint原理來說,使用Exactly-Once就是進行了Checkpoint Barrier對齊,因此會有一定的延遲。如果作業延遲小,那麼應該使用At-Least-Once投遞,不進行對齊,但某些數據會被處理多次。

// 使用At-Least-Onceenv.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)

如果一次Checkpoint超過一定時間仍未完成,直接將其終止,以免其佔用太多資源:

// 超時時間1小時env.getCheckpointConfig.setCheckpointTimeout(3600*1000)

如果兩次Checkpoint之間的間歇時間太短,那麼正常的作業可能獲取的資源較少,更多的資源被用在了Checkpoint上。對這個參數進行合理配置能保證數據流的正常處理。比如,設置這個參數為60秒,那麼前一次Checkpoint結束後60秒內不會啟動新的Checkpoint。這種模式只在整個作業最多允許1個Checkpoint時適用。

// 兩次Checkpoint的間隔為60秒env.getCheckpointConfig.setMinPauseBetweenCheckpoints(60*1000)

默認情況下一個作業只允許1個Checkpoint執行,如果某個Checkpoint正在進行,另外一個Checkpoint被啟動,新的Checkpoint需要掛起等待。

// 最多同時進行3個Checkpointenv.getCheckpointConfig.setMaxConcurrentCheckpoints(3)

如果這個參數大於1,將與前面提到的最短間隔相沖突。

Checkpoint的初衷是用來進行故障恢復,如果作業是因為異常而失敗,Flink會保存遠程存儲上的數據;如果開發者自己取消了作業,遠程存儲上的數據都會被刪除。如果開發者希望通過Checkpoint數據進行調試,自己取消了作業,同時希望將遠程數據保存下來,需要設置為:

// 作業取消後仍然保存Checkpointenv.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)

RETAIN_ON_CANCELLATION模式下,用戶需要自己手動刪除遠程存儲上的Checkpoint數據。

默認情況下,如果Checkpoint過程失敗,會導致整個應用重啟,我們可以關閉這個功能,這樣Checkpoint失敗不影響作業的運行。

env.getCheckpointConfig.setFailOnCheckpointingErrors(false)


1. Leslie Lamport, K. Mani Chandy: Distributed Snapshots: Determining Global States of a Distributed System. In: ACM Transactions on Computer Systems 3. Nr. 1, Februar 1985.


分享到:


相關文章: