Flink 狀態管理與 Checkpoint 機制

一、狀態分類

相對於其他流計算框架,Flink 一個比較重要的特性就是其支持有狀態計算。即你可以將中間的計算結果進行保存,並提供給後續的計算使用:

具體而言,Flink 又將狀態 (State) 分為 Keyed State 與 Operator State。

1.1 算子狀態

算子狀態 (Operator State):顧名思義,狀態是和算子進行綁定的,一個算子的狀態不能被其他算子所訪問到。官方文檔上對 Operator State 的解釋是:each operator state is bound to one parallel operator instance,所以更為確切的說一個算子狀態是與一個併發的算子實例所綁定的,即假設算子的並行度是 2,那麼其應有兩個對應的算子狀態:

1.2 鍵控狀態

鍵控狀態 (Keyed State) :是一種特殊的算子狀態,即狀態是根據 key 值進行區分的,Flink 會為每類鍵值維護一個狀態實例。如下圖所示,每個顏色代表不同 key 值,對應四個不同的狀態實例。需要注意的是鍵控狀態只能在 KeyedStream 上進行使用,我們可以通過 stream.keyBy(...) 來得到 KeyedStream 。

二、狀態編程

2.1 鍵控狀態

Flink 提供了以下數據格式來管理和存儲鍵控狀態 (Keyed State):

•ValueState:存儲單值類型的狀態。可以使用 update(T) 進行更新,並通過 T value 進行檢索。•ListState:存儲列表類型的狀態。可以使用 add(T) 或 addAll(List) 添加元素;並通過 get 獲得整個列表。•ReducingState:用於存儲經過 ReduceFunction 計算後的結果,使用 add(T) 增加元素。•AggregatingState:用於存儲經過 AggregatingState 計算後的結果,使用 add(IN) 添加元素。•FoldingState:已被標識為廢棄,會在未來版本中移除,官方推薦使用 AggregatingState 代替。•MapState:維護 Map 類型的狀態。

以上所有增刪改查方法不必硬記,在使用時通過語法提示來調用即可。這裡給出一個具體的使用示例:假設我們正在開發一個監控系統,當監控數據超過閾值一定次數後,需要發出報警信息。這裡之所以要達到一定次數,是因為由於偶發原因,偶爾一次超過閾值並不能代表什麼,故需要達到一定次數後才觸發報警,這就需要使用到 Flink 的狀態編程。相關代碼如下:

<code>publicclassThresholdWarningextends/<code><code>RichFlatMapFunction<tuple2>, Tuple2<string>>> {/<string>/<tuple2>/<code>
<code>// 通過ListState來存儲非正常數據的狀態/<code><code>privatetransientListState<long> abnormalData;/<long>/<code><code>// 需要監控的閾值/<code><code>privateLong threshold;/<code><code>// 觸發報警的次數/<code><code>privateInteger numberOfTimes;/<code>
<code>ThresholdWarning(Long threshold, Integer numberOfTimes) {/<code><code>this.threshold = threshold;/<code><code>this.numberOfTimes = numberOfTimes;/<code><code>}/<code>


<code>@Override/<code><code>publicvoid open(Configuration parameters) {/<code><code>// 通過狀態名稱(句柄)獲取狀態實例,如果不存在則會自動創建/<code><code> abnormalData = getRuntimeContext.getListState(/<code><code>newListStateDescriptor<>("abnormalData", Long.class));/<code><code>}/<code>
<code>@Override/<code><code>publicvoid flatMap(Tuple2<string> value, Collector<tuple2>>> out)/<tuple2>/<string>/<code><code>throwsException{/<code><code>Long inputValue = value.f1;/<code><code>// 如果輸入值超過閾值,則記錄該次不正常的數據信息/<code><code>if(inputValue >= threshold) {/<code><code> abnormalData.add(inputValue);/<code><code>}/<code><code>ArrayList<long> list = Lists.newArrayList(abnormalData.get.iterator);/<long>/<code><code>// 如果不正常的數據出現達到一定次數,則輸出報警信息/<code><code>if(list.size >= numberOfTimes) {/<code><code>out.collect(Tuple2.of(value.f0 + " 超過指定閾值 ", list));/<code><code>// 報警信息輸出後,清空狀態/<code><code> abnormalData.clear;/<code><code>}/<code><code>}/<code><code>}/<code>

調用自定義的狀態監控,這裡我們使用 a,b 來代表不同類型的監控數據,分別對其數據進行監控:

<code>finalStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment;/<code><code>DataStreamSource<tuple2>> tuple2DataStreamSource = env.fromElements(/<tuple2>/<code><code>Tuple2.of("a", 50L), Tuple2.of("a", 80L), Tuple2.of("a", 400L),/<code><code>Tuple2.of("a", 100L), Tuple2.of("a", 200L), Tuple2.of("a", 200L),/<code><code>Tuple2.of("b", 100L), Tuple2.of("b", 200L), Tuple2.of("b", 200L),/<code><code>Tuple2.of("b", 500L), Tuple2.of("b", 600L), Tuple2.of("b", 700L));/<code><code>tuple2DataStreamSource/<code><code>.keyBy(0)/<code><code>.flatMap(newThresholdWarning(100L, 3)) // 超過100的閾值3次後就進行報警/<code><code>.printToErr;/<code><code>env.execute("Managed Keyed State");/<code>

輸出如下結果如下:

2.2 狀態有效期

以上任何類型的 keyed state 都支持配置有效期 (TTL) ,示例如下:

<code>StateTtlConfig ttlConfig = StateTtlConfig/<code><code>// 設置有效期為 10 秒/<code><code>.newBuilder(Time.seconds(10)) /<code><code>// 設置有效期更新規則,這裡設置為當創建和寫入時,都重置其有效期到規定的10秒/<code><code>.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) /<code><code>/*設置只要值過期就不可見,另外一個可選值是ReturnExpiredIfNotCleanedUp,/<code><code>代表即使值過期了,但如果還沒有被物理刪除,就是可見的*//<code><code>.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)/<code><code>.build;/<code><code>ListStateDescriptor<long> descriptor = newListStateDescriptor<>("abnormalData", Long.class);/<long>/<code><code>descriptor.enableTimeToLive(ttlConfig);/<code>

2.3 算子狀態

相比於鍵控狀態,算子狀態目前支持的存儲類型只有以下三種:

•ListState:存儲列表類型的狀態。•UnionListState:存儲列表類型的狀態,與 ListState 的區別在於:如果並行度發生變化,ListState 會將該算子的所有併發的狀態實例進行彙總,然後均分給新的 Task;而 UnionListState 只是將所有併發的狀態實例彙總起來,具體的劃分行為則由用戶進行定義。•BroadcastState:用於廣播的算子狀態。

這裡我們繼續沿用上面的例子,假設此時我們不需要區分監控數據的類型,只要有監控數據超過閾值並達到指定的次數後,就進行報警,代碼如下:


<code>publicclassThresholdWarningextendsRichFlatMapFunction<tuple2>, /<tuple2>/<code><code>Tuple2<string>>>> implementsCheckpointedFunction{/<string>/<code>
<code>// 非正常數據/<code><code>privateList<tuple2>> bufferedData;/<tuple2>/<code><code>// checkPointedState/<code><code>privatetransientListState<tuple2>> checkPointedState;/<tuple2>/<code><code>// 需要監控的閾值/<code><code>privateLong threshold;/<code><code>// 次數/<code><code>privateInteger numberOfTimes;/<code>
<code>ThresholdWarning(Long threshold, Integer numberOfTimes) {/<code><code>this.threshold = threshold;/<code><code>this.numberOfTimes = numberOfTimes;/<code><code>this.bufferedData = newArrayList<>;/<code><code>}/<code>
<code>@Override/<code><code>publicvoid initializeState(FunctionInitializationContext context) throwsException{/<code><code>// 注意這裡獲取的是OperatorStateStore/<code><code> checkPointedState = context.getOperatorStateStore./<code><code> getListState(newListStateDescriptor<>("abnormalData",/<code><code>TypeInformation.of(newTypeHint<tuple2>> {/<tuple2>/<code><code>})));/<code><code>// 如果發生重啟,則需要從快照中將狀態進行恢復/<code><code>if(context.isRestored) {/<code><code>for(Tuple2<string> element : checkPointedState.get) {/<string>/<code><code> bufferedData.add(element);/<code><code>}/<code><code>}/<code><code>}/<code>
<code>@Override/<code><code>publicvoid flatMap(Tuple2<string> value, /<string>/<code><code>Collector<tuple2>>>> out) {/<tuple2>/<code><code>Long inputValue = value.f1;/<code><code>// 超過閾值則進行記錄/<code><code>if(inputValue >= threshold) {/<code><code> bufferedData.add(value);/<code><code>}/<code><code>// 超過指定次數則輸出報警信息/<code><code>if(bufferedData.size >= numberOfTimes) {/<code><code>// 順便輸出狀態實例的hashcode/<code><code>out.collect(Tuple2.of(checkPointedState.hashCode + "閾值警報!", bufferedData));/<code><code> bufferedData.clear;/<code><code>}/<code><code>}/<code>
<code>@Override/<code><code>publicvoid snapshotState(FunctionSnapshotContext context) throwsException{/<code><code>// 在進行快照時,將數據存儲到checkPointedState/<code><code> checkPointedState.clear;/<code><code>for(Tuple2<string> element : bufferedData) {/<string>/<code><code> checkPointedState.add(element);/<code><code>}/<code><code>}/<code><code>}/<code>

調用自定義算子狀態,這裡需要將並行度設置為 1:

<code>finalStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment;/<code><code>// 開啟檢查點機制/<code><code>env.enableCheckpointing(1000);/<code><code>// 設置並行度為1/<code><code>DataStreamSource<tuple2>> tuple2DataStreamSource = env.setParallelism(1).fromElements(/<tuple2>/<code><code>Tuple2.of("a", 50L), Tuple2.of("a", 80L), Tuple2.of("a", 400L),/<code><code>Tuple2.of("a", 100L), Tuple2.of("a", 200L), Tuple2.of("a", 200L),/<code><code>Tuple2.of("b", 100L), Tuple2.of("b", 200L), Tuple2.of("b", 200L),/<code><code>Tuple2.of("b", 500L), Tuple2.of("b", 600L), Tuple2.of("b", 700L));/<code><code>tuple2DataStreamSource/<code><code>.flatMap(newThresholdWarning(100L, 3))/<code><code>.printToErr;/<code><code>env.execute("Managed Keyed State");/<code><code>}/<code>

此時輸出如下:

在上面的調用代碼中,我們將程序的並行度設置為 1,可以看到三次輸出中狀態實例的 hashcode 全是一致的,證明它們都同一個狀態實例。假設將並行度設置為 2,此時輸出如下:

可以看到此時兩次輸出中狀態實例的 hashcode 是不一致的,代表它們不是同一個狀態實例,這也就是上文提到的,一個算子狀態是與一個併發的算子實例所綁定的。同時這裡只輸出兩次,是因為在併發處理的情況下,線程 1 可能拿到 5 個非正常值,線程 2 可能拿到 4 個非正常值,因為要大於 3 次才能輸出,所以在這種情況下就會出現只輸出兩條記錄的情況,所以需要將程序的並行度設置為 1。

三、檢查點機制

3.1 CheckPoints

為了使 Flink 的狀態具有良好的容錯性,Flink 提供了檢查點機制 (CheckPoints) 。通過檢查點機制,Flink 定期在數據流上生成 checkpoint barrier ,當某個算子收到 barrier 時,即會基於當前狀態生成一份快照,然後再將該 barrier 傳遞到下游算子,下游算子接收到該 barrier 後,也基於當前狀態生成一份快照,依次傳遞直至到最後的 Sink 算子上。當出現異常後,Flink 就可以根據最近的一次的快照數據將所有算子恢復到先前的狀態。

3.2 開啟檢查點

默認情況下,檢查點機制是關閉的,需要在程序中進行開啟:

<code>// 開啟檢查點機制,並指定狀態檢查點之間的時間間隔/<code><code>env.enableCheckpointing(1000); /<code>
<code>// 其他可選配置如下:/<code><code>// 設置語義/<code><code>env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);/<code><code>// 設置兩個檢查點之間的最小時間間隔/<code><code>env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500);/<code><code>// 設置執行Checkpoint操作時的超時時間/<code><code>env.getCheckpointConfig.setCheckpointTimeout(60000);/<code><code>// 設置最大併發執行的檢查點的數量/<code><code>env.getCheckpointConfig.setMaxConcurrentCheckpoints(1);/<code><code>// 將檢查點持久化到外部存儲/<code><code>env.getCheckpointConfig.enableExternalizedCheckpoints(/<code><code>ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);/<code><code>// 如果有更近的保存點時,是否將作業回退到該檢查點/<code><code>env.getCheckpointConfig.setPreferCheckpointForRecovery(true);/<code>

3.3 保存點機制

保存點機制 (Savepoints) 是檢查點機制的一種特殊的實現,它允許你通過手工的方式來觸發 Checkpoint,並將結果持久化存儲到指定路徑中,主要用於避免 Flink 集群在重啟或升級時導致狀態丟失。示例如下:

觸發指定id的作業的Savepoint,並將結果存儲到指定目錄下

bin/flink savepoint :jobId [:targetDirectory] 複製代碼更多命令和配置可以參考官方文檔:savepoints

四、狀態後端

4.1 狀態管理器分類

默認情況下,所有的狀態都存儲在 JVM 的堆內存中,在狀態數據過多的情況下,這種方式很有可能導致內存溢出,因此 Flink 該提供了其它方式來存儲狀態數據,這些存儲方式統一稱為狀態後端 (或狀態管理器):

主要有以下三種:

•MemoryStateBackend

默認的方式,即基於 JVM 的堆內存進行存儲,主要適用於本地開發和調試。

•FsStateBackend

基於文件系統進行存儲,可以是本地文件系統,也可以是 HDFS 等分佈式文件系統。需要注意而是雖然選擇使用了 FsStateBackend ,但正在進行的數據仍然是存儲在 TaskManager 的內存中的,只有在 checkpoint 時,才會將狀態快照寫入到指定文件系統上。

•RocksDBStateBackend

RocksDBStateBackend 是 Flink 內置的第三方狀態管理器,採用嵌入式的 key-value 型數據庫 RocksDB 來存儲正在進行的數據。等到 checkpoint 時,再將其中的數據持久化到指定的文件系統中,所以採用 RocksDBStateBackend 時也需要配置持久化存儲的文件系統。之所以這樣做是因為 RocksDB 作為嵌入式數據庫安全性比較低,但比起全文件系統的方式,其讀取速率更快;比起全內存的方式,其存儲空間更大,因此它是一種比較均衡的方案。

4.2 配置方式

Flink 支持使用兩種方式來配置後端管理器:第一種方式:基於代碼方式進行配置,只對當前作業生效:

<code>// 配置 FsStateBackend/<code><code>env.setStateBackend(newFsStateBackend("hdfs://namenode:40010/flink/checkpoints"));/<code><code>// 配置 RocksDBStateBackend/<code><code>env.setStateBackend(newRocksDBStateBackend("hdfs://namenode:40010/flink/checkpoints"));/<code>

配置 RocksDBStateBackend 時,需要額外導入下面的依賴:


<code><dependency>/<code><code><groupid>org.apache.flink/<groupid>/<code><code><artifactid>flink-statebackend-rocksdb_2.11/<artifactid>/<code><code><version>1.9.0/<version>/<code><code>

第二種方式:基於 flink-conf.yaml 配置文件的方式進行配置,對所有部署在該集群上的作業都生效:

<code>state.backend: filesystem/<code><code>state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints/<code>

•Working with State•Checkpointing•Savepoints•State Backends•Fabian Hueske , Vasiliki Kalavri . 《Stream Processing with Apache Flink》. O'Reilly Media . 2019-4-30

本文轉自:https://juejin.im/post/5dd2661cf265da0bf175d5bb

如果覺得文章對你有幫助,請轉發朋友圈、點在看,讓更多人獲益,感謝您的支持!


END






Flink 系列文章