ProcessFunction:Flink最底層API使用案例詳解

之前提到的一些算子和函數能夠進行一些時間上的操作,但是不能獲取算子當前的Processing Time或者是Watermark時間戳,調用起來簡單但功能相對受限。如果想獲取數據流中Watermark的時間戳,或者在時間上前後穿梭,需要使用ProcessFunction系列函數,它們是Flink體系中最底層的API,提供了對數據流更細粒度的操作權限。Flink SQL是基於這些函數實現的,一些需要高度個性化的業務場景也需要使用這些函數。

ProcessFunction:Flink最底層API使用案例詳解

目前,這個系列函數主要包括KeyedProcessFunction、ProcessFunction、CoProcessFunction、KeyedCoProcessFunction、ProcessJoinFunction和ProcessWindowFunction等多種函數,這些函數各有側重,但核心功能比較相似,主要包括兩點:

  • 狀態:我們可以在這些函數中訪問和更新Keyed State 。
  • 定時器(Timer):像定鬧鐘一樣設置定時器,我們可以在時間維度上設計更復雜的業務邏輯。

狀態的介紹可以參考我的文章: ,這裡我們重點講解一下的使用ProcessFunction其他幾個特色功能。本文所有代碼都上傳到了我的github:https://github.com/luweizheng/flink-tutorials

Timer的使用方法

我們可以把Timer理解成一個鬧鐘,使用前先在Timer中註冊一個未來的時間,當這個時間到達,鬧鐘會“響起”,程序會執行一個回調函數,回調函數中執行一定的業務邏輯。這裡以KeyedProcessFunction為例,來介紹Timer的註冊和使用。

ProcessFunction有兩個重要的接口processElement和onTimer,其中processElement函數在源碼中的Java簽名如下:

<code>// 處理數據流中的一條元素public abstract void processElement(I value, Context ctx, Collector out)/<code>

processElement方法處理數據流中的一條元素,並通過Collector輸出出來。Context是它的區別於FlatMapFunction等普通函數的特色,開發者可以通過Context來獲取時間戳,訪問TimerService,設置Timer。

另外一個接口是onTimer:

<code>// 時間到達後的回調函數public void onTimer(long timestamp, OnTimerContext ctx, Collector out)/<code>

這是一個回調函數,當到了“鬧鐘”時間,Flink會調用onTimer,並執行一些業務邏輯。這裡也有一個參數OnTimerContext,它實際上是繼承了前面的Context,與Context幾乎相同。

使用Timer的方法主要邏輯為:

  1. 在processElement方法中通過Context註冊一個未來的時間戳t。這個時間戳的語義可以是Processing Time,也可以是Event Time,根據業務需求來選擇。
  2. 在onTimer方法中實現一些邏輯,到達t時刻,onTimer方法被自動調用。

從Context中,我們可以獲取一個TimerService,這是一個訪問時間戳和Timer的接口。我們可以通過Context.timerService.registerProcessingTimeTimer或`Context.timerService.registerEventTimeTimer這兩個方法來註冊Timer,只需要傳入一個時間戳即可。我們可以通過Context.timerService.deleteProcessingTimeTimer和Context.timerService.deleteEventTimeTimer來刪除之前註冊的Timer。此外,還可以從中獲取當前的時間戳:Context.timerService.currentProcessingTime和Context.timerService.currentWatermark。從函數名看出,這裡都是兩兩出現的函數,兩個方法分別對應兩種時間語義。

注意,我們只能在KeyedStream上註冊Timer。每個Key下可以使用不同的時間戳註冊不同的Timer,但是每個Key的每個時間戳只能註冊一個Timer。如果想在一個DataStream上應用Timer,可以將所有數據映射到一個偽造的Key上,但這樣所有數據會流入一個算子子任務。

我們再次以股票股票交易場景來解釋如何使用Timer。一次股票交易包括:股票代號、時間戳、股票價格、成交量。我們現在想看一支股票10秒內是否一直連續上漲,如果一直上漲,則發送出一個提示。

<code>case class StockPrice(symbol: String, ts: Long, price: Double, volume: Int)class IncreaseAlertFunction(intervalMills: Long)extends KeyedProcessFunction[String, StockPrice, String] {  // 狀態:保存某支股票上次交易價格  lazy val lastPrice: ValueState[Double] =  getRuntimeContext.getState(    new ValueStateDescriptor[Double]("lastPrice", Types.of[Double])  )  // 狀態:保存某支股票的定時器時間戳  lazy val currentTimer: ValueState[Long] =  getRuntimeContext.getState(    new ValueStateDescriptor[Long]("timer", Types.of[Long])  )  override def processElement(stock: StockPrice,                              context: KeyedProcessFunction[String, StockPrice, String]#Context,                              out: Collector[String]): Unit = {    // 獲取lastPrice狀態中的數據,第一次使用時會被初始化為0    val prevPrice = lastPrice.value()    // 更新lastPrice    lastPrice.update(stock.price)    val curTimerTimestamp = currentTimer.value()    if (prevPrice == 0.0) {      // 第一次使用,不做任何處理    } else if (stock.price < prevPrice) {      // 如果新流入的股票價格降低,刪除Timer,否則該Timer一直保留      context.timerService().deleteEventTimeTimer(curTimerTimestamp)      currentTimer.clear()    } else if (stock.price >= prevPrice && curTimerTimestamp == 0) {      // 如果新流入的股票價格升高      // curTimerTimestamp為0表示currentTimer狀態中是空的,還沒有對應的Timer      // 新Timer = 當前時間 + interval      val timerTs = context.timestamp() + intervalMills      val formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")      context.timerService().registerEventTimeTimer(timerTs)      // 更新currentTimer狀態,後續數據會讀取currentTimer,做相關判斷      currentTimer.update(timerTs)    }  }  override def onTimer(ts: Long,                       ctx: KeyedProcessFunction[String, StockPrice, String]#OnTimerContext,                       out: Collector[String]): Unit = {    val formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")    out.collect("time: " + formatter.format(ts) + ", symbol: '" + ctx.getCurrentKey +                " monotonically increased for " + intervalMills + " millisecond.")    // 清空currentTimer狀態    currentTimer.clear()  }}/<code>

在主邏輯裡,通過下面的process算子調用KeyedProcessFunction:

<code>val inputStream: DataStream[StockPrice] = ...val warnings = inputStream      .keyBy(stock => stock.symbol)      // 調用process函數      .process(new IncreaseAlertFunction(10000))/<code>

Checkpoint時,Timer也會隨其他狀態數據一起保存起來。如果使用Processing Time語義設置一些Timer,重啟時這個時間戳已經過期,那些回調函數會立刻被調用執行。

側輸出SideOutput

ProcessFunction的另一大特色功能是可以將一部分數據發送到另外一個流中,而且輸出到的兩個流數據類型可以不一樣,我們通過OutputTag[T]來標記另外一個數據流。在ProcessFunction中這樣將某類數據過濾出來:

<code>class IncreaseAlertFunction(intervalMills: Long) extends KeyedProcessFunction[String, Stock, String] {  override def processElement(stock: Stock,                              context: KeyedProcessFunction[String, Stock, String]#Context,                              out: Collector[String]): Unit = {    // 其他業務邏輯...    // 定義一個OutputTag,Stock為這個SideOutput流的數據類型    val highVolumeOutput: OutputTag[Stock] = new OutputTag[Stock]("high-volume-trade")    if (stock.volume > 1000) {      // 將Stock篩選出來發送到該OutputTag下      context.output(highVolumeOutput, stock)    }  }}/<code>

在主邏輯中,通過下面的方法獲取側輸出:

<code>// 收集SideOutputval outputTag: OutputTag[Stock] = OutputTag[Stock]("high-volume-trade")val sideOutputStream: DataStream[Stock] = mainStream.getSideOutput(outputTag)/<code>

從這個例子中可以看到,KeyedProcessFunction的輸出類型是String,而SideOutput的輸出類型是Stock,兩者可以不同。

使用ProcessFunction實現Join

如果想從更細的粒度上實現兩個數據流的Join,可以使用CoProcessFunction或KeyedCoProcessFunction。這兩個函數都有processElement1和processElement2方法,分別對第一個數據流和第二個數據流的每個元素進行處理。兩個數據流的數據類型以及輸出類型可以互不相同。儘管數據來自兩個不同的流,但是他們可以共享同樣的狀態,所以可以參考下面的邏輯來實現Join:

  • 創建一到多個狀態,兩個數據流都能訪問到這些狀態,這裡以狀態a為例。
  • processElement1方法處理第一個數據流,更新狀態a。
  • processElement2方法處理第二個數據流,根據狀態a中的數據,生成相應的輸出。

我們這次將股票價格結合媒體評價兩個數據流一起討論,假設對於某支股票有一個媒體評價數據流,這個數據流包含了對該支股票的正負評價。兩支數據流一起流入KeyedCoProcessFunction,processElement2方法處理流入的媒體數據,將媒體評價更新到狀態mediaState上,processElement1方法處理流入的股票交易數據,獲取mediaState`狀態,生成到新的數據流。兩個方法分別處理兩個數據流,共享一個狀態,通過狀態來通信。

在主邏輯中,我們將兩個數據流connect,然後按照股票代號進行keyBy,進而使用process算子:

<code>val stockPriceRawStream: DataStream[StockPrice] = ...val mediaStatusStream: DataStream[Media] = ...val warnings = stockStream.connect(mediaStream)      .keyBy(0, 0)      // 調用process函數      .process(new AlertProcessFunction())/<code>

KeyedCoProcessFunction的具體實現:

<code>class JoinStockMediaProcessFunction extends KeyedCoProcessFunction[String, StockPrice, Media, StockPrice] {  // mediaState  private var mediaState: ValueState[String] = _  override def open(parameters: Configuration): Unit = {    // 從RuntimeContext中獲取狀態    mediaState = getRuntimeContext.getState(      new ValueStateDescriptor[String]("mediaStatusState", classOf[String]))  }  override def processElement1(stock: StockPrice,                               context: KeyedCoProcessFunction[String, StockPrice, Media, StockPrice]#Context,                               collector: Collector[StockPrice]): Unit = {    val mediaStatus = mediaState.value()    if (null != mediaStatus) {      val newStock = stock.copy(mediaStatus = mediaStatus)      collector.collect(newStock)    }  }  override def processElement2(media: Media,                               context: KeyedCoProcessFunction[String, StockPrice, Media, StockPrice]#Context,                               collector: Collector[StockPrice]): Unit = {    // 第二個流更新mediaState    mediaState.update(media.status)  }}/<code>

這個例子比較簡單,沒有使用Timer,實際的業務場景中狀態一般用到Timer將過期的狀態清除。很多互聯網APP的機器學習樣本拼接都可能依賴這個函數來實現:服務端的機器學習特徵是實時生成的,用戶在APP上的行為是交互後產生的,兩者屬於兩個不同的數據流,可以按照這個邏輯來將兩個數據流拼接起來,通過拼接更快得到下一輪機器學習的樣本數據。兩個數據流的中間數據放在狀態中,為避免狀態的無限增長,需要使用Timer將過期的狀態清除。

注意,使用Event Time時,兩個數據流必須都設置好Watermark,只設置一個流的Event Time和Watermark,無法在CoProcessFunction和KeyedCoProcessFunction中使用Timer功能,因為process算子無法確定自己應該以怎樣的時間來處理數據。


分享到:


相關文章: