之前提到的一些算子和函数能够进行一些时间上的操作,但是不能获取算子当前的Processing Time或者是Watermark时间戳,调用起来简单但功能相对受限。如果想获取数据流中Watermark的时间戳,或者在时间上前后穿梭,需要使用ProcessFunction系列函数,它们是Flink体系中最底层的API,提供了对数据流更细粒度的操作权限。Flink SQL是基于这些函数实现的,一些需要高度个性化的业务场景也需要使用这些函数。
目前,这个系列函数主要包括KeyedProcessFunction、ProcessFunction、CoProcessFunction、KeyedCoProcessFunction、ProcessJoinFunction和ProcessWindowFunction等多种函数,这些函数各有侧重,但核心功能比较相似,主要包括两点:
- 状态:我们可以在这些函数中访问和更新Keyed State 。
- 定时器(Timer):像定闹钟一样设置定时器,我们可以在时间维度上设计更复杂的业务逻辑。
状态的介绍可以参考我的文章: ,这里我们重点讲解一下的使用ProcessFunction其他几个特色功能。本文所有代码都上传到了我的github:https://github.com/luweizheng/flink-tutorials
Timer的使用方法
我们可以把Timer理解成一个闹钟,使用前先在Timer中注册一个未来的时间,当这个时间到达,闹钟会“响起”,程序会执行一个回调函数,回调函数中执行一定的业务逻辑。这里以KeyedProcessFunction为例,来介绍Timer的注册和使用。
ProcessFunction有两个重要的接口processElement和onTimer,其中processElement函数在源码中的Java签名如下:
<code>// 处理数据流中的一条元素public abstract void processElement(I value, Context ctx, Collectorout) /<code>
processElement方法处理数据流中的一条元素,并通过Collector
另外一个接口是onTimer:
<code>// 时间到达后的回调函数public void onTimer(long timestamp, OnTimerContext ctx, Collectorout) /<code>
这是一个回调函数,当到了“闹钟”时间,Flink会调用onTimer,并执行一些业务逻辑。这里也有一个参数OnTimerContext,它实际上是继承了前面的Context,与Context几乎相同。
使用Timer的方法主要逻辑为:
- 在processElement方法中通过Context注册一个未来的时间戳t。这个时间戳的语义可以是Processing Time,也可以是Event Time,根据业务需求来选择。
- 在onTimer方法中实现一些逻辑,到达t时刻,onTimer方法被自动调用。
从Context中,我们可以获取一个TimerService,这是一个访问时间戳和Timer的接口。我们可以通过Context.timerService.registerProcessingTimeTimer或`Context.timerService.registerEventTimeTimer这两个方法来注册Timer,只需要传入一个时间戳即可。我们可以通过Context.timerService.deleteProcessingTimeTimer和Context.timerService.deleteEventTimeTimer来删除之前注册的Timer。此外,还可以从中获取当前的时间戳:Context.timerService.currentProcessingTime和Context.timerService.currentWatermark。从函数名看出,这里都是两两出现的函数,两个方法分别对应两种时间语义。
注意,我们只能在KeyedStream上注册Timer。每个Key下可以使用不同的时间戳注册不同的Timer,但是每个Key的每个时间戳只能注册一个Timer。如果想在一个DataStream上应用Timer,可以将所有数据映射到一个伪造的Key上,但这样所有数据会流入一个算子子任务。
我们再次以股票股票交易场景来解释如何使用Timer。一次股票交易包括:股票代号、时间戳、股票价格、成交量。我们现在想看一支股票10秒内是否一直连续上涨,如果一直上涨,则发送出一个提示。
<code>case class StockPrice(symbol: String, ts: Long, price: Double, volume: Int)class IncreaseAlertFunction(intervalMills: Long)extends KeyedProcessFunction[String, StockPrice, String] { // 状态:保存某支股票上次交易价格 lazy val lastPrice: ValueState[Double] = getRuntimeContext.getState( new ValueStateDescriptor[Double]("lastPrice", Types.of[Double]) ) // 状态:保存某支股票的定时器时间戳 lazy val currentTimer: ValueState[Long] = getRuntimeContext.getState( new ValueStateDescriptor[Long]("timer", Types.of[Long]) ) override def processElement(stock: StockPrice, context: KeyedProcessFunction[String, StockPrice, String]#Context, out: Collector[String]): Unit = { // 获取lastPrice状态中的数据,第一次使用时会被初始化为0 val prevPrice = lastPrice.value() // 更新lastPrice lastPrice.update(stock.price) val curTimerTimestamp = currentTimer.value() if (prevPrice == 0.0) { // 第一次使用,不做任何处理 } else if (stock.price < prevPrice) { // 如果新流入的股票价格降低,删除Timer,否则该Timer一直保留 context.timerService().deleteEventTimeTimer(curTimerTimestamp) currentTimer.clear() } else if (stock.price >= prevPrice && curTimerTimestamp == 0) { // 如果新流入的股票价格升高 // curTimerTimestamp为0表示currentTimer状态中是空的,还没有对应的Timer // 新Timer = 当前时间 + interval val timerTs = context.timestamp() + intervalMills val formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS") context.timerService().registerEventTimeTimer(timerTs) // 更新currentTimer状态,后续数据会读取currentTimer,做相关判断 currentTimer.update(timerTs) } } override def onTimer(ts: Long, ctx: KeyedProcessFunction[String, StockPrice, String]#OnTimerContext, out: Collector[String]): Unit = { val formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS") out.collect("time: " + formatter.format(ts) + ", symbol: '" + ctx.getCurrentKey + " monotonically increased for " + intervalMills + " millisecond.") // 清空currentTimer状态 currentTimer.clear() }}/<code>
在主逻辑里,通过下面的process算子调用KeyedProcessFunction:
<code>val inputStream: DataStream[StockPrice] = ...val warnings = inputStream .keyBy(stock => stock.symbol) // 调用process函数 .process(new IncreaseAlertFunction(10000))/<code>
Checkpoint时,Timer也会随其他状态数据一起保存起来。如果使用Processing Time语义设置一些Timer,重启时这个时间戳已经过期,那些回调函数会立刻被调用执行。
侧输出SideOutput
ProcessFunction的另一大特色功能是可以将一部分数据发送到另外一个流中,而且输出到的两个流数据类型可以不一样,我们通过OutputTag[T]来标记另外一个数据流。在ProcessFunction中这样将某类数据过滤出来:
<code>class IncreaseAlertFunction(intervalMills: Long) extends KeyedProcessFunction[String, Stock, String] { override def processElement(stock: Stock, context: KeyedProcessFunction[String, Stock, String]#Context, out: Collector[String]): Unit = { // 其他业务逻辑... // 定义一个OutputTag,Stock为这个SideOutput流的数据类型 val highVolumeOutput: OutputTag[Stock] = new OutputTag[Stock]("high-volume-trade") if (stock.volume > 1000) { // 将Stock筛选出来发送到该OutputTag下 context.output(highVolumeOutput, stock) } }}/<code>
在主逻辑中,通过下面的方法获取侧输出:
<code>// 收集SideOutputval outputTag: OutputTag[Stock] = OutputTag[Stock]("high-volume-trade")val sideOutputStream: DataStream[Stock] = mainStream.getSideOutput(outputTag)/<code>
从这个例子中可以看到,KeyedProcessFunction的输出类型是String,而SideOutput的输出类型是Stock,两者可以不同。
使用ProcessFunction实现Join
如果想从更细的粒度上实现两个数据流的Join,可以使用CoProcessFunction或KeyedCoProcessFunction。这两个函数都有processElement1和processElement2方法,分别对第一个数据流和第二个数据流的每个元素进行处理。两个数据流的数据类型以及输出类型可以互不相同。尽管数据来自两个不同的流,但是他们可以共享同样的状态,所以可以参考下面的逻辑来实现Join:
- 创建一到多个状态,两个数据流都能访问到这些状态,这里以状态a为例。
- processElement1方法处理第一个数据流,更新状态a。
- processElement2方法处理第二个数据流,根据状态a中的数据,生成相应的输出。
我们这次将股票价格结合媒体评价两个数据流一起讨论,假设对于某支股票有一个媒体评价数据流,这个数据流包含了对该支股票的正负评价。两支数据流一起流入KeyedCoProcessFunction,processElement2方法处理流入的媒体数据,将媒体评价更新到状态mediaState上,processElement1方法处理流入的股票交易数据,获取mediaState`状态,生成到新的数据流。两个方法分别处理两个数据流,共享一个状态,通过状态来通信。
在主逻辑中,我们将两个数据流connect,然后按照股票代号进行keyBy,进而使用process算子:
<code>val stockPriceRawStream: DataStream[StockPrice] = ...val mediaStatusStream: DataStream[Media] = ...val warnings = stockStream.connect(mediaStream) .keyBy(0, 0) // 调用process函数 .process(new AlertProcessFunction())/<code>
KeyedCoProcessFunction的具体实现:
<code>class JoinStockMediaProcessFunction extends KeyedCoProcessFunction[String, StockPrice, Media, StockPrice] { // mediaState private var mediaState: ValueState[String] = _ override def open(parameters: Configuration): Unit = { // 从RuntimeContext中获取状态 mediaState = getRuntimeContext.getState( new ValueStateDescriptor[String]("mediaStatusState", classOf[String])) } override def processElement1(stock: StockPrice, context: KeyedCoProcessFunction[String, StockPrice, Media, StockPrice]#Context, collector: Collector[StockPrice]): Unit = { val mediaStatus = mediaState.value() if (null != mediaStatus) { val newStock = stock.copy(mediaStatus = mediaStatus) collector.collect(newStock) } } override def processElement2(media: Media, context: KeyedCoProcessFunction[String, StockPrice, Media, StockPrice]#Context, collector: Collector[StockPrice]): Unit = { // 第二个流更新mediaState mediaState.update(media.status) }}/<code>
这个例子比较简单,没有使用Timer,实际的业务场景中状态一般用到Timer将过期的状态清除。很多互联网APP的机器学习样本拼接都可能依赖这个函数来实现:服务端的机器学习特征是实时生成的,用户在APP上的行为是交互后产生的,两者属于两个不同的数据流,可以按照这个逻辑来将两个数据流拼接起来,通过拼接更快得到下一轮机器学习的样本数据。两个数据流的中间数据放在状态中,为避免状态的无限增长,需要使用Timer将过期的状态清除。
注意,使用Event Time时,两个数据流必须都设置好Watermark,只设置一个流的Event Time和Watermark,无法在CoProcessFunction和KeyedCoProcessFunction中使用Timer功能,因为process算子无法确定自己应该以怎样的时间来处理数据。
閱讀更多 皮皮魯的AI星球 的文章