Flink水印機制(watermark)

Flink流處理時間方式

  • EventTime 時間發生的時間,例如:點擊網站上的某個鏈接的時間
  • IngestionTime 某個Flink節點的source operator接收到數據的時間,例如:某個source消費到kafka中的數據
  • ProcessingTime 某個Flink節點執行某個operation的時間,例如:timeWindow接收到數據的時間
Flink水印機制(watermark)

設置Flink流處理的時間類型

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

問題

Flink水印機制(watermark)

1. 使用時間窗口來統計10分鐘內的用戶流量

2. 有一個時間窗口

  • 開始時間為:2017-03-19 10:00:00
  • 結束時間為:2017-03-19 10:10:00

3. 有一個數據,因為網絡延遲

  • 事件發生的時間為:2017-03-19 10: 10 :00
  • 但進入到窗口的時間為:2017-03-19 10:10: 02 ,延遲了2秒中

4. 時間窗口並沒有將 59 這個數據計算進來,導致數據統計不正確

這種處理方式,根據消息進入到window時間,來進行計算。在網絡有延遲的時候,會引起計算誤差。

水印(watermark)

水印就是一個時間戳,可以給每個消息添加一個 允許一定延遲 的時間戳

  • 窗口可以繼續計算一定時間範圍內延遲的消息
  • 添加水印後,窗口會等 5 秒,再執行計算。若超過5秒,則捨棄。
  • 窗口執行計算時間由 水印時間 來觸發,當接收到消息的 watermark >= endtime ,觸發計算
Flink水印機制(watermark)

Flink提供添加水印的API

<code> val watermarkData: DataStream[Message] =

clicklogDataStream.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[Message]

{

var currentTimestamp: Long = 0L


val maxDelayTime = 5000L

var watermark: Watermark = null

// 獲取當前的水印

override def getCurrentWatermark = {

watermark = new Watermark(currentTimestamp - maxDelayTime)

watermark

}

// 時間戳抽取操作

override def extractTimestamp(t: Message, l: Long) = {

val timeStamp = t.timestamp

currentTimestamp = Math.max(timeStamp, currentTimestamp)

currentTimestamp

}

})/<code>


分享到:


相關文章: