Flink 狀態管理與 Checkpoint 機制

一、狀態分類

相對於其他流計算框架,Flink 一個比較重要的特性就是其支持有狀態計算。即你可以將中間的計算結果進行保存,並提供給後續的計算使用:

Flink 状态管理与 Checkpoint 机制

具體而言,Flink 又將狀態 (State) 分為 Keyed State 與 Operator State。

1.1 算子狀態

算子狀態 (Operator State):顧名思義,狀態是和算子進行綁定的,一個算子的狀態不能被其他算子所訪問到。官方文檔上對 Operator State 的解釋是:each operator state is bound to one parallel operator instance,所以更為確切的說一個算子狀態是與一個併發的算子實例所綁定的,即假設算子的並行度是 2,那麼其應有兩個對應的算子狀態:

Flink 状态管理与 Checkpoint 机制

1.2 鍵控狀態

鍵控狀態 (Keyed State) :是一種特殊的算子狀態,即狀態是根據 key 值進行區分的,Flink 會為每類鍵值維護一個狀態實例。如下圖所示,每個顏色代表不同 key 值,對應四個不同的狀態實例。需要注意的是鍵控狀態只能在 KeyedStream 上進行使用,我們可以通過 stream.keyBy(...) 來得到 KeyedStream 。

Flink 状态管理与 Checkpoint 机制

二、狀態編程

2.1 鍵控狀態

Flink 提供了以下數據格式來管理和存儲鍵控狀態 (Keyed State):

•ValueState:存儲單值類型的狀態。可以使用 update(T) 進行更新,並通過 T value 進行檢索。•ListState:存儲列表類型的狀態。可以使用 add(T) 或 addAll(List) 添加元素;並通過 get 獲得整個列表。•ReducingState:用於存儲經過 ReduceFunction 計算後的結果,使用 add(T) 增加元素。•AggregatingState:用於存儲經過 AggregatingState 計算後的結果,使用 add(IN) 添加元素。•FoldingState:已被標識為廢棄,會在未來版本中移除,官方推薦使用 AggregatingState 代替。•MapState:維護 Map 類型的狀態。

以上所有增刪改查方法不必硬記,在使用時通過語法提示來調用即可。這裡給出一個具體的使用示例:假設我們正在開發一個監控系統,當監控數據超過閾值一定次數後,需要發出報警信息。這裡之所以要達到一定次數,是因為由於偶發原因,偶爾一次超過閾值並不能代表什麼,故需要達到一定次數後才觸發報警,這就需要使用到 Flink 的狀態編程。相關代碼如下:

<code>publicclassThresholdWarningextends/<code><code>RichFlatMapFunction<tuple2>, Tuple2<string>>> {/<string>/<tuple2>/<code>
<code>// 通過ListState來存儲非正常數據的狀態/<code><code>privatetransientListState<long> abnormalData;/<long>/<code><code>// 需要監控的閾值/<code><code>privateLong threshold;/<code><code>// 觸發報警的次數/<code><code>privateInteger numberOfTimes;/<code>
<code>ThresholdWarning(Long threshold, Integer numberOfTimes) {/<code><code>this.threshold = threshold;/<code><code>this.numberOfTimes = numberOfTimes;/<code><code>}/<code>
<code>@Override/<code><code>publicvoid open(Configuration parameters) {/<code><code>// 通過狀態名稱(句柄)獲取狀態實例,如果不存在則會自動創建/<code><code> abnormalData = getRuntimeContext.getListState(/<code><code>newListStateDescriptor<>("abnormalData", Long.class));/<code><code>}/<code>

<code>@Override/<code><code>publicvoid flatMap(Tuple2<string> value, Collector<tuple2>>> out)/<tuple2>/<string>/<code><code>throwsException{/<code><code>Long inputValue = value.f1;/<code><code>// 如果輸入值超過閾值,則記錄該次不正常的數據信息/<code><code>if(inputValue >= threshold) {/<code><code> abnormalData.add(inputValue);/<code><code>}/<code><code>ArrayList<long> list = Lists.newArrayList(abnormalData.get.iterator);/<long>/<code><code>// 如果不正常的數據出現達到一定次數,則輸出報警信息/<code><code>if(list.size >= numberOfTimes) {/<code><code>out.collect(Tuple2.of(value.f0 + " 超過指定閾值 ", list));/<code><code>// 報警信息輸出後,清空狀態/<code><code> abnormalData.clear;/<code><code>}/<code><code>}/<code><code>}/<code>

調用自定義的狀態監控,這裡我們使用 a,b 來代表不同類型的監控數據,分別對其數據進行監控:

<code>finalStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment;/<code><code>DataStreamSource<tuple2>> tuple2DataStreamSource = env.fromElements(/<tuple2>/<code><code>Tuple2.of("a", 50L), Tuple2.of("a", 80L), Tuple2.of("a", 400L),/<code><code>Tuple2.of("a", 100L), Tuple2.of("a", 200L), Tuple2.of("a", 200L),/<code><code>Tuple2.of("b", 100L), Tuple2.of("b", 200L), Tuple2.of("b", 200L),/<code><code>Tuple2.of("b", 500L), Tuple2.of("b", 600L), Tuple2.of("b", 700L));/<code><code>tuple2DataStreamSource/<code><code>.keyBy(0)/<code><code>.flatMap(newThresholdWarning(100L, 3)) // 超過100的閾值3次後就進行報警/<code><code>.printToErr;/<code><code>env.execute("Managed Keyed State");/<code>

輸出如下結果如下:

Flink 状态管理与 Checkpoint 机制

2.2 狀態有效期

以上任何類型的 keyed state 都支持配置有效期 (TTL) ,示例如下:

<code>StateTtlConfig ttlConfig = StateTtlConfig/<code><code>// 設置有效期為 10 秒/<code><code>.newBuilder(Time.seconds(10)) /<code><code>// 設置有效期更新規則,這裡設置為當創建和寫入時,都重置其有效期到規定的10秒/<code><code>.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) /<code><code>/*設置只要值過期就不可見,另外一個可選值是ReturnExpiredIfNotCleanedUp,/<code><code>代表即使值過期了,但如果還沒有被物理刪除,就是可見的*//<code><code>.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)/<code><code>.build;/<code><code>ListStateDescriptor<long> descriptor = newListStateDescriptor<>("abnormalData", Long.class);/<long>/<code><code>descriptor.enableTimeToLive(ttlConfig);/<code>

2.3 算子狀態

相比於鍵控狀態,算子狀態目前支持的存儲類型只有以下三種:

•ListState:存儲列表類型的狀態。•UnionListState:存儲列表類型的狀態,與 ListState 的區別在於:如果並行度發生變化,ListState 會將該算子的所有併發的狀態實例進行彙總,然後均分給新的 Task;而 UnionListState 只是將所有併發的狀態實例彙總起來,具體的劃分行為則由用戶進行定義。•BroadcastState:用於廣播的算子狀態。

這裡我們繼續沿用上面的例子,假設此時我們不需要區分監控數據的類型,只要有監控數據超過閾值並達到指定的次數後,就進行報警,代碼如下:

 

<code>publicclassThresholdWarningextendsRichFlatMapFunction<tuple2>, /<tuple2>/<code><code>Tuple2<string>>>> implementsCheckpointedFunction{/<string>/<code>
<code>// 非正常數據/<code><code>privateList<tuple2>> bufferedData;/<tuple2>/<code><code>// checkPointedState/<code><code>privatetransientListState<tuple2>> checkPointedState;/<tuple2>/<code><code>// 需要監控的閾值/<code><code>privateLong threshold;/<code><code>// 次數/<code><code>privateInteger numberOfTimes;/<code>
<code>ThresholdWarning(Long threshold, Integer numberOfTimes) {/<code><code>this.threshold = threshold;/<code><code>this.numberOfTimes = numberOfTimes;/<code><code>this.bufferedData = newArrayList<>;/<code><code>}/<code>
<code>@Override/<code><code>publicvoid initializeState(FunctionInitializationContext context) throwsException{/<code><code>// 注意這裡獲取的是OperatorStateStore/<code><code> checkPointedState = context.getOperatorStateStore./<code><code> getListState(newListStateDescriptor<>("abnormalData",/<code><code>TypeInformation.of(newTypeHint<tuple2>> {/<tuple2>/<code><code>})));/<code><code>// 如果發生重啟,則需要從快照中將狀態進行恢復/<code><code>if(context.isRestored) {/<code><code>for(Tuple2<string> element : checkPointedState.get) {/<string>/<code><code> bufferedData.add(element);/<code><code>}/<code><code>}/<code><code>}/<code>
<code>@Override/<code><code>publicvoid flatMap(Tuple2<string> value, /<string>/<code><code>Collector<tuple2>>>> out) {/<tuple2>/<code><code>Long inputValue = value.f1;/<code><code>// 超過閾值則進行記錄/<code><code>if(inputValue >= threshold) {/<code><code> bufferedData.add(value);/<code><code>}/<code><code>// 超過指定次數則輸出報警信息/<code><code>if(bufferedData.size >= numberOfTimes) {/<code><code>// 順便輸出狀態實例的hashcode/<code><code>out.collect(Tuple2.of(checkPointedState.hashCode + "閾值警報!", bufferedData));/<code><code> bufferedData.clear;/<code><code>}/<code><code>}/<code>
<code>@Override/<code><code>publicvoid snapshotState(FunctionSnapshotContext context) throwsException{/<code><code>// 在進行快照時,將數據存儲到checkPointedState/<code><code> checkPointedState.clear;/<code><code>for(Tuple2<string> element : bufferedData) {/<string>/<code><code> checkPointedState.add(element);/<code><code>}/<code><code>}/<code><code>}/<code>

調用自定義算子狀態,這裡需要將並行度設置為 1:

<code>finalStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment;/<code><code>// 開啟檢查點機制/<code><code>env.enableCheckpointing(1000);/<code><code>// 設置並行度為1/<code><code>DataStreamSource<tuple2>> tuple2DataStreamSource = env.setParallelism(1).fromElements(/<tuple2>/<code><code>Tuple2.of("a", 50L), Tuple2.of("a", 80L), Tuple2.of("a", 400L),/<code><code>Tuple2.of("a", 100L), Tuple2.of("a", 200L), Tuple2.of("a", 200L),/<code><code>Tuple2.of("b", 100L), Tuple2.of("b", 200L), Tuple2.of("b", 200L),/<code><code>Tuple2.of("b", 500L), Tuple2.of("b", 600L), Tuple2.of("b", 700L));/<code><code>tuple2DataStreamSource/<code><code>.flatMap(newThresholdWarning(100L, 3))/<code><code>.printToErr;/<code><code>env.execute("Managed Keyed State");/<code><code>}/<code>

此時輸出如下:

Flink 状态管理与 Checkpoint 机制

在上面的調用代碼中,我們將程序的並行度設置為 1,可以看到三次輸出中狀態實例的 hashcode 全是一致的,證明它們都同一個狀態實例。假設將並行度設置為 2,此時輸出如下:

Flink 状态管理与 Checkpoint 机制

可以看到此時兩次輸出中狀態實例的 hashcode 是不一致的,代表它們不是同一個狀態實例,這也就是上文提到的,一個算子狀態是與一個併發的算子實例所綁定的。同時這裡只輸出兩次,是因為在併發處理的情況下,線程 1 可能拿到 5 個非正常值,線程 2 可能拿到 4 個非正常值,因為要大於 3 次才能輸出,所以在這種情況下就會出現只輸出兩條記錄的情況,所以需要將程序的並行度設置為 1。

三、檢查點機制

3.1 CheckPoints

為了使 Flink 的狀態具有良好的容錯性,Flink 提供了檢查點機制 (CheckPoints) 。通過檢查點機制,Flink 定期在數據流上生成 checkpoint barrier ,當某個算子收到 barrier 時,即會基於當前狀態生成一份快照,然後再將該 barrier 傳遞到下游算子,下游算子接收到該 barrier 後,也基於當前狀態生成一份快照,依次傳遞直至到最後的 Sink 算子上。當出現異常後,Flink 就可以根據最近的一次的快照數據將所有算子恢復到先前的狀態。

Flink 状态管理与 Checkpoint 机制

3.2 開啟檢查點

默認情況下,檢查點機制是關閉的,需要在程序中進行開啟:

<code>// 開啟檢查點機制,並指定狀態檢查點之間的時間間隔/<code><code>env.enableCheckpointing(1000); /<code>
<code>// 其他可選配置如下:/<code><code>// 設置語義/<code><code>env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);/<code><code>// 設置兩個檢查點之間的最小時間間隔/<code><code>env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500);/<code><code>// 設置執行Checkpoint操作時的超時時間/<code><code>env.getCheckpointConfig.setCheckpointTimeout(60000);/<code><code>// 設置最大併發執行的檢查點的數量/<code><code>env.getCheckpointConfig.setMaxConcurrentCheckpoints(1);/<code><code>// 將檢查點持久化到外部存儲/<code><code>env.getCheckpointConfig.enableExternalizedCheckpoints(/<code><code>ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);/<code><code>// 如果有更近的保存點時,是否將作業回退到該檢查點/<code><code>env.getCheckpointConfig.setPreferCheckpointForRecovery(true);/<code>

3.3 保存點機制

保存點機制 (Savepoints) 是檢查點機制的一種特殊的實現,它允許你通過手工的方式來觸發 Checkpoint,並將結果持久化存儲到指定路徑中,主要用於避免 Flink 集群在重啟或升級時導致狀態丟失。示例如下:

觸發指定id的作業的Savepoint,並將結果存儲到指定目錄下

bin/flink savepoint :jobId [:targetDirectory] 複製代碼更多命令和配置可以參考官方文檔:savepoints

四、狀態後端

4.1 狀態管理器分類

默認情況下,所有的狀態都存儲在 JVM 的堆內存中,在狀態數據過多的情況下,這種方式很有可能導致內存溢出,因此 Flink 該提供了其它方式來存儲狀態數據,這些存儲方式統一稱為狀態後端 (或狀態管理器):

Flink 状态管理与 Checkpoint 机制

主要有以下三種:

•MemoryStateBackend

默認的方式,即基於 JVM 的堆內存進行存儲,主要適用於本地開發和調試。

•FsStateBackend

基於文件系統進行存儲,可以是本地文件系統,也可以是 HDFS 等分佈式文件系統。需要注意而是雖然選擇使用了 FsStateBackend ,但正在進行的數據仍然是存儲在 TaskManager 的內存中的,只有在 checkpoint 時,才會將狀態快照寫入到指定文件系統上。

•RocksDBStateBackend

RocksDBStateBackend 是 Flink 內置的第三方狀態管理器,採用嵌入式的 key-value 型數據庫 RocksDB 來存儲正在進行的數據。等到 checkpoint 時,再將其中的數據持久化到指定的文件系統中,所以採用 RocksDBStateBackend 時也需要配置持久化存儲的文件系統。之所以這樣做是因為 RocksDB 作為嵌入式數據庫安全性比較低,但比起全文件系統的方式,其讀取速率更快;比起全內存的方式,其存儲空間更大,因此它是一種比較均衡的方案。

4.2 配置方式

Flink 支持使用兩種方式來配置後端管理器:第一種方式:基於代碼方式進行配置,只對當前作業生效:

<code>// 配置 FsStateBackend/<code><code>env.setStateBackend(newFsStateBackend("hdfs://namenode:40010/flink/checkpoints"));/<code><code>// 配置 RocksDBStateBackend/<code><code>env.setStateBackend(newRocksDBStateBackend("hdfs://namenode:40010/flink/checkpoints"));/<code>

配置 RocksDBStateBackend 時,需要額外導入下面的依賴:


<code><dependency>/<code><code><groupid>org.apache.flink/<groupid>/<code><code><artifactid>flink-statebackend-rocksdb_2.11/<artifactid>/<code><code><version>1.9.0/<version>/<code><code>

第二種方式:基於 flink-conf.yaml 配置文件的方式進行配置,對所有部署在該集群上的作業都生效:

<code>state.backend: filesystem/<code><code>state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints/<code>

•Working with State•Checkpointing•Savepoints•State Backends•Fabian Hueske , Vasiliki Kalavri . 《Stream Processing with Apache Flink》. O'Reilly Media . 2019-4-30

本文轉自:https://juejin.im/post/5dd2661cf265da0bf175d5bb

如果覺得文章對你有幫助,請轉發朋友圈、點在看,讓更多人獲益,感謝您的支持!


END






Flink 系列文章


分享到:


相關文章: