ProcessFunction:Flink最底層API使用案例詳解

之前提到的一些算子和函數能夠進行一些時間上的操作,但是不能獲取算子當前的Processing Time或者是Watermark時間戳,調用起來簡單但功能相對受限。如果想獲取數據流中Watermark的時間戳,或者在時間上前後穿梭,需要使用ProcessFunction系列函數,它們是Flink體系中最底層的API,提供了對數據流更細粒度的操作權限。Flink SQL是基於這些函數實現的,一些需要高度個性化的業務場景也需要使用這些函數。

目前,這個系列函數主要包括KeyedProcessFunction、ProcessFunction、CoProcessFunction、KeyedCoProcessFunction、ProcessJoinFunction和ProcessWindowFunction等多種函數,這些函數各有側重,但核心功能比較相似,主要包括兩點:

狀態:我們可以在這些函數中訪問和更新Keyed State 。定時器(Timer):像定鬧鐘一樣設置定時器,我們可以在時間維度上設計更復雜的業務邏輯。

狀態的介紹可以參考我的文章: ,這裡我們重點講解一下的使用ProcessFunction其他幾個特色功能。本文所有代碼都上傳到了我的github:https://github.com/luweizheng/flink-tutorials

Timer的使用方法

我們可以把Timer理解成一個鬧鐘,使用前先在Timer中註冊一個未來的時間,當這個時間到達,鬧鐘會“響起”,程序會執行一個回調函數,回調函數中執行一定的業務邏輯。這裡以KeyedProcessFunction為例,來介紹Timer的註冊和使用。

ProcessFunction有兩個重要的接口processElement和onTimer,其中processElement函數在源碼中的Java簽名如下:

<code>// 處理數據流中的一條元素public abstract void processElement(I value, Context ctx, Collector out)/<code>

processElement方法處理數據流中的一條元素,並通過Collector輸出出來。Context是它的區別於FlatMapFunction等普通函數的特色,開發者可以通過Context來獲取時間戳,訪問TimerService,設置Timer。

另外一個接口是onTimer:

<code>// 時間到達後的回調函數public void onTimer(long timestamp, OnTimerContext ctx, Collector out)/<code>

這是一個回調函數,當到了“鬧鐘”時間,Flink會調用onTimer,並執行一些業務邏輯。這裡也有一個參數OnTimerContext,它實際上是繼承了前面的Context,與Context幾乎相同。

使用Timer的方法主要邏輯為:

在processElement方法中通過Context註冊一個未來的時間戳t。這個時間戳的語義可以是Processing Time,也可以是Event Time,根據業務需求來選擇。在onTimer方法中實現一些邏輯,到達t時刻,onTimer方法被自動調用。

從Context中,我們可以獲取一個TimerService,這是一個訪問時間戳和Timer的接口。我們可以通過Context.timerService.registerProcessingTimeTimer或`Context.timerService.registerEventTimeTimer這兩個方法來註冊Timer,只需要傳入一個時間戳即可。我們可以通過Context.timerService.deleteProcessingTimeTimer和Context.timerService.deleteEventTimeTimer來刪除之前註冊的Timer。此外,還可以從中獲取當前的時間戳:Context.timerService.currentProcessingTime和Context.timerService.currentWatermark。從函數名看出,這裡都是兩兩出現的函數,兩個方法分別對應兩種時間語義。

注意,我們只能在KeyedStream上註冊Timer。每個Key下可以使用不同的時間戳註冊不同的Timer,但是每個Key的每個時間戳只能註冊一個Timer。如果想在一個DataStream上應用Timer,可以將所有數據映射到一個偽造的Key上,但這樣所有數據會流入一個算子子任務。

我們再次以股票股票交易場景來解釋如何使用Timer。一次股票交易包括:股票代號、時間戳、股票價格、成交量。我們現在想看一支股票10秒內是否一直連續上漲,如果一直上漲,則發送出一個提示。

<code>case class StockPrice(symbol: String, ts: Long, price: Double, volume: Int)class IncreaseAlertFunction(intervalMills: Long)extends KeyedProcessFunction[String, StockPrice, String] { // 狀態:保存某支股票上次交易價格 lazy val lastPrice: ValueState[Double] = getRuntimeContext.getState( new ValueStateDescriptor[Double]("lastPrice", Types.of[Double]) ) // 狀態:保存某支股票的定時器時間戳 lazy val currentTimer: ValueState[Long] = getRuntimeContext.getState( new ValueStateDescriptor[Long]("timer", Types.of[Long]) ) override def processElement(stock: StockPrice, context: KeyedProcessFunction[String, StockPrice, String]#Context, out: Collector[String]): Unit = { // 獲取lastPrice狀態中的數據,第一次使用時會被初始化為0 val prevPrice = lastPrice.value() // 更新lastPrice lastPrice.update(stock.price) val curTimerTimestamp = currentTimer.value() if (prevPrice == 0.0) { // 第一次使用,不做任何處理 } else if (stock.price < prevPrice) { // 如果新流入的股票價格降低,刪除Timer,否則該Timer一直保留 context.timerService().deleteEventTimeTimer(curTimerTimestamp) currentTimer.clear() } else if (stock.price >= prevPrice && curTimerTimestamp == 0) { // 如果新流入的股票價格升高 // curTimerTimestamp為0表示currentTimer狀態中是空的,還沒有對應的Timer // 新Timer = 當前時間 + interval val timerTs = context.timestamp() + intervalMills val formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS") context.timerService().registerEventTimeTimer(timerTs) // 更新currentTimer狀態,後續數據會讀取currentTimer,做相關判斷 currentTimer.update(timerTs) } } override def onTimer(ts: Long, ctx: KeyedProcessFunction[String, StockPrice, String]#OnTimerContext, out: Collector[String]): Unit = { val formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS") out.collect("time: " + formatter.format(ts) + ", symbol: '" + ctx.getCurrentKey + " monotonically increased for " + intervalMills + " millisecond.") // 清空currentTimer狀態 currentTimer.clear() }}/<code>

在主邏輯裡,通過下面的process算子調用KeyedProcessFunction:

<code>val inputStream: DataStream[StockPrice] = ...val warnings = inputStream .keyBy(stock => stock.symbol) // 調用process函數 .process(new IncreaseAlertFunction(10000))/<code>

Checkpoint時,Timer也會隨其他狀態數據一起保存起來。如果使用Processing Time語義設置一些Timer,重啟時這個時間戳已經過期,那些回調函數會立刻被調用執行。

側輸出SideOutput

ProcessFunction的另一大特色功能是可以將一部分數據發送到另外一個流中,而且輸出到的兩個流數據類型可以不一樣,我們通過OutputTag[T]來標記另外一個數據流。在ProcessFunction中這樣將某類數據過濾出來:

<code>class IncreaseAlertFunction(intervalMills: Long) extends KeyedProcessFunction[String, Stock, String] { override def processElement(stock: Stock, context: KeyedProcessFunction[String, Stock, String]#Context, out: Collector[String]): Unit = { // 其他業務邏輯... // 定義一個OutputTag,Stock為這個SideOutput流的數據類型 val highVolumeOutput: OutputTag[Stock] = new OutputTag[Stock]("high-volume-trade") if (stock.volume > 1000) { // 將Stock篩選出來發送到該OutputTag下 context.output(highVolumeOutput, stock) } }}/<code>

在主邏輯中,通過下面的方法獲取側輸出:

<code>// 收集SideOutputval outputTag: OutputTag[Stock] = OutputTag[Stock]("high-volume-trade")val sideOutputStream: DataStream[Stock] = mainStream.getSideOutput(outputTag)/<code>

從這個例子中可以看到,KeyedProcessFunction的輸出類型是String,而SideOutput的輸出類型是Stock,兩者可以不同。

使用ProcessFunction實現Join

如果想從更細的粒度上實現兩個數據流的Join,可以使用CoProcessFunction或KeyedCoProcessFunction。這兩個函數都有processElement1和processElement2方法,分別對第一個數據流和第二個數據流的每個元素進行處理。兩個數據流的數據類型以及輸出類型可以互不相同。儘管數據來自兩個不同的流,但是他們可以共享同樣的狀態,所以可以參考下面的邏輯來實現Join:

創建一到多個狀態,兩個數據流都能訪問到這些狀態,這裡以狀態a為例。processElement1方法處理第一個數據流,更新狀態a。processElement2方法處理第二個數據流,根據狀態a中的數據,生成相應的輸出。

我們這次將股票價格結合媒體評價兩個數據流一起討論,假設對於某支股票有一個媒體評價數據流,這個數據流包含了對該支股票的正負評價。兩支數據流一起流入KeyedCoProcessFunction,processElement2方法處理流入的媒體數據,將媒體評價更新到狀態mediaState上,processElement1方法處理流入的股票交易數據,獲取mediaState`狀態,生成到新的數據流。兩個方法分別處理兩個數據流,共享一個狀態,通過狀態來通信。

在主邏輯中,我們將兩個數據流connect,然後按照股票代號進行keyBy,進而使用process算子:

<code>val stockPriceRawStream: DataStream[StockPrice] = ...val mediaStatusStream: DataStream[Media] = ...val warnings = stockStream.connect(mediaStream) .keyBy(0, 0) // 調用process函數 .process(new AlertProcessFunction())/<code>

KeyedCoProcessFunction的具體實現:

<code>class JoinStockMediaProcessFunction extends KeyedCoProcessFunction[String, StockPrice, Media, StockPrice] { // mediaState private var mediaState: ValueState[String] = _ override def open(parameters: Configuration): Unit = { // 從RuntimeContext中獲取狀態 mediaState = getRuntimeContext.getState( new ValueStateDescriptor[String]("mediaStatusState", classOf[String])) } override def processElement1(stock: StockPrice, context: KeyedCoProcessFunction[String, StockPrice, Media, StockPrice]#Context, collector: Collector[StockPrice]): Unit = { val mediaStatus = mediaState.value() if (null != mediaStatus) { val newStock = stock.copy(mediaStatus = mediaStatus) collector.collect(newStock) } } override def processElement2(media: Media, context: KeyedCoProcessFunction[String, StockPrice, Media, StockPrice]#Context, collector: Collector[StockPrice]): Unit = { // 第二個流更新mediaState mediaState.update(media.status) }}/<code>

這個例子比較簡單,沒有使用Timer,實際的業務場景中狀態一般用到Timer將過期的狀態清除。很多互聯網APP的機器學習樣本拼接都可能依賴這個函數來實現:服務端的機器學習特徵是實時生成的,用戶在APP上的行為是交互後產生的,兩者屬於兩個不同的數據流,可以按照這個邏輯來將兩個數據流拼接起來,通過拼接更快得到下一輪機器學習的樣本數據。兩個數據流的中間數據放在狀態中,為避免狀態的無限增長,需要使用Timer將過期的狀態清除。

注意,使用Event Time時,兩個數據流必須都設置好Watermark,只設置一個流的Event Time和Watermark,無法在CoProcessFunction和KeyedCoProcessFunction中使用Timer功能,因為process算子無法確定自己應該以怎樣的時間來處理數據。