Flink 狀態(State)管理在推薦場景中的應用

導語

Flink 提供了靈活豐富的狀態管理,可輕鬆解決數據之間的關聯性。本文介紹了Flink 狀態(State)管理在推薦場景中的應用,大家結合自己的應用場景與業務邏輯,選擇合適的狀態管理。


背景

Flink作為純流式大數據實時計算引擎,較於Spark Streaming的微批處理引擎,不管是內存管理,多流合併,還是時間窗口,迭代處理上,Flink在實時計算場景更較適合。而Flink的State狀態管理,更是讓Flink在實時計算領域,更勝一籌。通過對Flink State狀態的靈活妙用,可以完美實現大數據下的實時數倉,實時畫像和實時數據監控等功能。


場景

最近在做推薦數據平臺,其中有一個場景需求是要實時統計最近1分鐘的UV、點擊量、真實曝光量和下發量等熱點數據,並可以在不同地域維度下做多維度查詢。通過對數據的實時跟蹤監控,可以精準迅速地獲悉推薦算法在不同地域投放後所產生的流量變化,從而優化對不同地域下用戶的精準推薦。

Flink 狀態(State)管理在推薦場景中的應用

問題&選型

我們在做場景分析的時候,發現有兩個問題需要解決。

首先是我們的數據來自於用戶對App的操作行為日誌,在這些埋點數據裡,有個字段localId(13位數字組成),該字段記錄了該用戶所在的位置編號,可以精確到區,街道,甚至村委會,但是缺少上下層級關係。也就是說,通過localId我們無法得知該用戶屬於哪個鎮,哪個市,哪個省等。所以,在對該數據做進一步操作前,需要找到localId的地域映射關係,並關聯到省市縣等,從而實現省市縣下不同維度下的熱點數據統計與分析。

另外就是對於埋點上報的真實曝光數據,存在較嚴重的數據延遲問題,甚至可達到數個小時的延遲,嚴重影響數據的準確性和時效性。這部分的原因是真實曝光的定義與App客戶端的埋點上報機制所致。

結合上述問題,在構思方案的選型上:

首先想到使用Spark做數據處理引擎,不管從公司使用的人數和任務數上,還有維護上,使用Spark無疑是最穩定的選擇。但是Spark是基於RDD做的micro batch處理,而Spark Streaming又只是在Spark RDD基礎上增加了時間維度(時間片),其本質還是在進行Spark的RDD處理。Spark Streaming將流式計算分解成了多個Spark Job,而每一段數據的處理都會經過RDD DAG有向無環圖的分解,和Spark Scheduler的調度分配,其最小的Batch Size為0.5秒~2秒鐘之間。所以,Spark Streaming適用於對實時性要求不是非常高的流式準實時計算場景。而我們的數據有一部分是實時上報上來,例如點擊與下發數據。我們希望對這些數據做秒級內處理,所以在處理這種延時性較低的數據上,Spark Streaming可能不是很適合。並且,一天不同時間段訪問UV量參差不齊,導致一天內不同時間段的流量峰值或高或低,而且節假日期間的流量更是不可預測。Spark Streaming從v1.5才開始引進反壓機制(back-pressure),而且也只是估計當前系統處理數據的速率,再調節系統接收數據的速率與之匹配,無法實現動態反壓。而Spark在流式計算上的缺陷,正是Flink的優勢。與Spark基於RDD計算不同,Flink是基於有狀態流的計算,並提供了更豐富靈活的狀態用以保存狀態數據,我們就用到了Flink Stream的Broadcast State解決了localId的地域映射關係。並且Flink對數據流也是逐條處理,在低延時上明顯優於Spark Streaming。Flink在1.5之後,採用Credit-based的網絡流控制機制,對運行時的Task有著天然流控,慢的數據sink節點會反壓快的數據源source。系統能接收數據的前提是接收數據的Task必須有空閒可用的Buffer,而數據被繼續處理的前提是下游Task也有空閒可用的Buffer,只有下游的Task有了空閒的Buffer,才能消費上游Task的Buffer。所以,Flink的反壓,是系統接收數據的速率和處理數據速率的自然匹配。

其次,我們在專門基於流處理框架Storm和Flink中做了比較,雖然Storm在延遲處理上優於Flink,但從吞吐量,資源動態調整,SQL支持,狀態管理,容錯機制和社區活躍度等來看,Flink都明顯優於Storm。特別是Storm沒有任何對狀態的支持,需要依賴其他組件實現狀態管理。最重要的,Flink在公司內部有專門的WStream平臺,並由專業的團隊維護。

所以,我們選擇了Flink做數據流處理框架,而基於真實曝光數據上報延遲較嚴重問題,我們選擇Druid這種時序性的數據庫作為數據存儲,在保證數據不丟失的前提下,還能做到數據的近實時聚合查詢。


機制原理

Spark有廣播變量,把地域映射表數據直接Broadcast共享到到各個Worker Node的內存中,接下來的Operator操作都可以基於各自所在Worker節點已拷貝到內存中僅一份的地域映射表數據進行操作。Flink Datastreaming也有類似Spark的Broadcast廣播變量,都實現了節約內存和共享變量的作用。但其機制原理與使用方法與Spark 廣播變量截然不同。Flink Streaming的Broadcast作為Flink State的一種,類似Hadoop的分佈式緩存,Flink會複製文件或者目錄到所有Worker Node的本地文件系統,讓並行運行實例的函數可以本地訪問。為保證每個節點獲取到的Broadcast State一致,Worker Node中的Broadcast State並不會相互傳播通信,也不會被修改,且同一個Worker Node的所有Task可以共享廣播狀態。這個功能被常用來緩存不大且不可變的靜態數據,例如地域映射表或者機器學習的邏輯迴歸模型等。而在使用方法上,Flink DataStreaming需要定義StateDescriptor來廣播狀態到各個Worker Node。而每個Task在處理數據時,通過StateDescriptor就可以獲取緩存在本Worker Node的廣播狀態,相對Spark 廣播變量API的使用較複雜一些。

Flink 狀態(State)管理在推薦場景中的應用

不僅有Broadcast State,根據數據集是否按照Key分區,Flink可以將狀態分為Keyed State和Operator State(No-Keyed State)兩種類型,而這兩種狀態類型又均具有兩種形式,分別是託管狀態(Managed State)和原生狀態(Raw State)。區別在於託管狀態(Managed State)是由Flink Runtime控制和管理的狀態數據,並將狀態數據轉化存儲在Java Heap內存的Hash Table或RocksDB,然後將這些狀態數據通過內部的CheckpointedFunction接口持久化到Checkpoints中,而狀態的一致性,其實也是通過Checkpoints實現。因為有第三方RocksDB數據庫的參與,可以把State數據暫存RocksDB數據庫中,相比存於Java Heap中更安全。如果開啟Checkpoint增量機制,新產生的數據會替換之前產生的文件COPY到持久化中,以此減少COPY的數據量,並提高性能,更適合在生產環境使用。但目前為止,RocksDB還不支持Broadcast State。

當任務出現異常退出時,也可以通過這些狀態數據進行恢復,讀取已經Checkpoints的狀態數據,可以還原任務失敗前的狀態,包括記錄已經消費過的Kafka偏移量,以此實現容錯機制。當任務從狀態數據恢復時,可以繼續從未消費的Kafka偏移量開始讀取數據,從而實現Flink Source端Exactly-Once語義。

原生狀態(Raw State)由算子自己管理數據結構,當觸發Checkpoint過程時,只是將數據轉化成字節碼數據存在Checkpoints中,當從Checkpoints恢復任務時,算子再自己反序列化出狀態的數據結構,常用於自定義算子操作中。雖然兩者都可以實現狀態的管理和存儲,但託管狀態可以更好地支持狀態數據的重平衡以及更加完善的內存管理,經常被使用。

託管狀態(Managed State)已經有了官方實現好的幾種狀態,可以根據實際場景與業務邏輯選擇使用,例如BroadcastState,ValueState[T],ListState[T],ReducingState[T],AggregatingState[IN,OUT],MapState[UK,UV],而所有的託管狀態都需要通過創建StateDescriptor來獲取響應的State的操作類,該描述符主要定義了狀態的名稱,狀態中的數據類型參數信息以及狀態自定義函數,方便Flink的序列化與反序列化。每種託管狀態都有對應的描述符,例如ValueStateDescriptor,ListStateDescriptor,ReducingStateDescriptor,FoldingStateDescriptor,MapStateDescriptor等。而且通過對Flink State設置TTL,自定義超時清理時間,在狀態量越來越大的情形,可對內存做著優化處理的同時,還不會影響性能。


數據流程處理

我們的數據處理是基於Lambda架構搭建,數據暫存於Kafka消息隊列,分流供離線與實時做不同處理。

Flink 狀態(State)管理在推薦場景中的應用

Flink在處理數據流程上層次分明,Source(數據源)->Channel(數據處理)->Sink(數據存儲)。

首先,Flink Source端接入已經存入Kafka消息隊列的數據,這裡面的數據可以是經過ETL處理後的JSON格式數據。

其次,使用Flink對接入的數據做operator操作,並將處理結果Sink暫存到另一個Kafka消息隊列,供Druid做Pull拉取操作。

最後,基於Druid做數據可視化操作或供其他接口調用。

下面是針對具體場景的代碼實現:

首先,讀取存於HDFS的地域表數據,並把表數據轉換成自定義LocalContainerJava對象流。

Flink 狀態(State)管理在推薦場景中的應用


Flink 狀態(State)管理在推薦場景中的應用

其次,定義MapStateDescriptor描述符,並把轉換的表數據對象流廣播出去,描述符必須和表數據對象流格式相對應。

Flink 狀態(State)管理在推薦場景中的應用

然後,使用connect將BroadcastStream 與Kafka傳來的數據流合併,因為這兩種流數據類型不同,所以使用connect多流合併,返回BroadcastConnectedStream。Flink的多流合併還有union和join等,union要求連接的流數據類型必須一致,join則要求每個流必須有key且key值相同才能完成關聯操作。之後調用Process方法分別對各流做具體邏輯操作。因為我們的場景並不需要對數據集做Key分區,所以在Process方法裡傳入抽象類BroadcastProcessFunction的參數實現。

在抽象類BroadcastProcessFunction裡,這裡必須重寫兩個方法,一個是processBroadcastElement,一個是processElement,前者用以處理廣播流,後者用以直接處理從Kafka讀取的數據流,當然也可以重寫 open和close方法,做一些初始化與收尾操作,具體根據自己的應用場景的需要來決定。

Flink 狀態(State)管理在推薦場景中的應用

在processBroadcastElement方法,通過上下文Context傳入之前定義Broadcast State的描述符,獲取到BroadcastState操作類,並對BroadcastState做put鍵值對操作。這裡的put操作並不會影響其他TaskManager節點的Broadcast State數據,只會作用於當前節點。這點類似定義本地全局HashMap,只是這裡TaskManager把這些狀態數據轉換成內存Hash Table存儲,並Checkpoint到JobManager,最後JobManager根據配置信息setStateBackend存儲Checkpoints數據。這裡,Flink RunTime幫我們做了內部具體實現,讓我們可以只關注具體業務邏輯,而不必考慮數據在節點間的傳輸和序列化等問題。

Flink 狀態(State)管理在推薦場景中的應用

而processElement方法,可以對MapState進行各種操作。類似HashMap接口,MapState可以通過entries(),keys(),values()獲取對應的keys或values的集合。然後把結果集collect到下游算子。

最後,把結果數據集做Sink處理。這裡可以把Flink做完的聚合統計結果,直接存入第三方存儲裡(例如Kafka,Redis甚至Mysql等),只要把Sink定義成相應的connector即可。

這裡,我們沒有使用Flink做聚合操作的原因,從Kafka傳來的部分數據,不可避免出現延遲時間問題,甚至有些數據延遲達數個小時以上。Flink可以解決亂序問題,但是對於延時過長的數據,藉助其他大數據組件是更好的選擇。同時也是因為部分數據數據延時過長的原因,我們使用Flink默認的ProcessTime,以Flink處理時間為準。因為不需要Flink做聚合操作,所以也就沒有自定義Window。

最後把數據傳入第三方大數據組件Druid,我們在Druid裡做聚合查詢操作。而Druid使用的是數據的EventTime,通過把數據存入Druid時序數據庫的不同時間Segment,就解決了數據延遲時間參差不齊的問題,實時性和性能都有提高。隨著時間的推移,遲到數據陸續存入不同時間的Segment,準確度越來越高,遲到的數據會不斷更新最後的結果,解決埋點數據上報的延遲問題。


經驗總結

無論是在實時還是離線場景,數據之間難免會有關聯。Flink 提供了靈活豐富的狀態管理,可輕鬆解決數據之間的關聯性。結合自己的應用場景與業務邏輯,選擇合適的狀態管理。

Flink雖然可以處理大多數實時計算場景,但對某些特殊場景,可能並不是特別適合,或者處理起來較複雜。如果可以參考其他大數據組件,與Flink相互結合使用,無論從代碼開發,還是預期效果上,可能會事半功倍。


參考文獻

Apache Flink官方文檔:https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/state.html

代松辰,高級大數據開發工程師,現就職於58同鎮算法技術部。


分享到:


相關文章: