Flink最難知識點解析:時間

時間、窗口、水印、遲到數據這四個知識點幾乎是Flink這個框架最難點。我之前發了很多文章來解釋。很多同學仍然理解不了。

事實上這跟Flink的文檔不全有直接關係。在這個問題上官網的資料不夠,學習成本巨大。

希望這篇文章能解答讀者在這個問題上的困惑。本文結合源碼和實例講解。

Flink支持根據事件時間處理,數據流中的每條數據都需要具有各自的時間戳,代表著數據的產生時間【事件時間】。

在分佈式系統中,數據流的採集通常都是有延遲的,可能是網絡原因啊,程序原因啊什麼的。所以當數據到達Flink程序中的時候,問題就來了,這些數據都要進行處理嗎?有可能其中一部分數據已經延遲了好幾個小時了,這對於實時性較強的業務場景是不能容忍的!

這時候水印就應運而生了,水印的目的就是為了解決亂序的數據問題,可以在時間窗口內根據事件時間來進行業務處理,對於亂序的有延遲的數據可以在一定時間範圍內進行等待,那這個時間範圍是怎麼計算的呢?

我們先來捋捋思路

數據在源源不斷的進入flink,我們設置好window的大小為5s,flink會以5s來將每分鐘劃分為連續的多個窗口。

則flink劃分的時間窗口為(左閉右開):

Flink最難知識點解析:時間/窗口/水印/遲到數據處理


進入flink的第一條數據會落在一個時間窗口內,假設數據的事件時間為13s(小時和分不重要,因為窗口大小的度量單位是秒),則落入的窗口是【10-15】。對於存在延遲的數據,我們能容忍的時間是3s,超過3s我就不等你了,繼續進行窗口操作。

這裡就要提到一個知識點:Window的觸發條件是什麼,什麼時候開始進行window操作?

  • 該窗口中存在數據
  • 事件時間到達窗口的結束時間

好,知道了window觸發條件後我們繼續分析,第一個條件肯定滿足的,只要有數據就行了。

第二個條件,窗口的結束時間是15s,但是我們加了水印,允許數據延遲3秒,換句話說就是本來在15秒這個窗口就應該開始統計數據了,但是為了等一些延遲的數據,我要在18s才開始進行統計

【10-15】窗口觸發的條件就是:存在一條數據的事件時間大於等於18s

下面我們用實例來驗證:

大概講解一下代碼的流程:

1、監聽某主機的9000端口,讀取socket數據(格式為 name:timestamp)

2、給當前進入flink程序的數據加上waterMark,值為eventTime-3s

3、根據name值進行分組,根據窗口大小為5s劃分窗口,依次統計窗口中各name值的數據

4、啟動Job

還記得我們開始說的嗎?

<code>flink會根據window的間隔時間進行時間窗口範圍的劃分(與數據進入flink的時間無關)/<code>

程序中我們設置的window間隔時間為5s,則窗口劃分的結果為:【0-5】【5-10】【10-15】...【50-55】【55-60】,該窗口都是左閉右開區間。

那麼我們開始在主機s102上輸入數據:

Flink最難知識點解析:時間/窗口/水印/遲到數據處理

控制檯輸出:

Flink最難知識點解析:時間/窗口/水印/遲到數據處理

可以看出eventTime為10s,waterMark為:7s(10-3),所屬的window窗口應該是【10-15】,按照我們之前說的,如果想要觸發window操作,應該輸入一條數據,該數據的eventTime值剛好等於18【waterMark的值為3,使得window結束的時間推遲3s故為:(15+3)】,那我們繼續輸入數據:

Flink最難知識點解析:時間/窗口/水印/遲到數據處理

控制檯輸出:

Flink最難知識點解析:時間/窗口/水印/遲到數據處理

一直輸入到16都還沒觸發window操作,我們繼續輸入:

Flink最難知識點解析:時間/窗口/水印/遲到數據處理

查看控制檯輸出:

Flink最難知識點解析:時間/窗口/水印/遲到數據處理

可以看出當輸入eventTime為18的數據時就觸發了window操作,window的區間確實是【10-15】,也成功統計出了該範圍內的數據。

那我們繼續輸入數據,看看什麼時候觸發下一個窗口:

Flink最難知識點解析:時間/窗口/水印/遲到數據處理

控制檯輸出:

Flink最難知識點解析:時間/窗口/水印/遲到數據處理

看來確實是如果出現一條數據,使得eventTime=window結束時間+waterMark即可觸發window操作

總結一下:

水印的目的:處理亂序的數據問題 需要結合window來處理

window觸發的條件:

1、window中必須要數據

2、waterMark值=window的結束時間/event-time=window的結束時間+允許亂序的時間(waterMark值)

對於延遲的數據Flink也有自己的解決辦法,主要的辦法是給定一個允許延遲的時間,在該時間範圍內仍可以接受處理延遲數據

設置允許延遲的時間是通過allowedLateness(lateness: Time)設置

保存延遲數據則是通過sideOutputLateData(outputTag: OutputTag[T])保存

獲取延遲數據是通過DataStream.getSideOutput(tag: OutputTag[X])獲取


1、allowedLateness(lateness: Time)

Flink最難知識點解析:時間/窗口/水印/遲到數據處理


該方法傳入一個Time值,設置允許數據遲到的時間,這個時間和waterMark中的時間概念不同。再來回顧一下,

waterMark=數據的事件時間-允許亂序時間值

隨著新數據的到來,waterMark的值會更新為最新數據事件時間-允許亂序時間值,但是如果這時候來了一條歷史數據,waterMark值則不會更新。總的來說,waterMark是為了能接收到儘可能多的亂序數據。

那這裡的Time值呢?主要是為了等待遲到的數據,在一定時間範圍內,如果屬於該窗口的數據到來,仍會進行計算,後面會對計算方式仔細說明

注意:該方法只針對於基於event-time的窗口,如果是基於processing-time,並且指定了非零的time值則會拋出異常

2、sideOutputLateData(outputTag: OutputTag[T])

Flink最難知識點解析:時間/窗口/水印/遲到數據處理


該方法是將遲來的數據保存至給定的outputTag參數,而OutputTag則是用來標記延遲數據的一個對象。

3、DataStream.getSideOutput(tag: OutputTag[X])

通過window等操作返回的DataStream調用該方法,傳入標記延遲數據的對象來獲取延遲的數據

4、對延遲數據的理解

延遲數據是指:

在當前窗口【假設窗口範圍為10-15】已經計算之後,又來了一個屬於該窗口的數據【假設事件時間為13】,這時候仍會觸發window操作,這種數據就稱為延遲數據。

那麼問題來了,延遲時間怎麼計算呢?

假設窗口範圍為10-15,延遲時間為2s,則只要waterMark<15+2,並且屬於該窗口,就能觸發window操作。而如果來了一條數據使得waterMark>=15+2,10-15這個窗口就不能再觸發window操作,即使新來的數據的event-time<15+2+3

5、代碼實例講解

大概講解一下代碼的流程:

1、監聽某主機的9000端口,讀取socket數據(格式為 name:timestamp)

2、給當前進入flink程序的數據加上waterMark,值為eventTime-3s

3、根據name值進行分組,根據窗口大小為5s劃分窗口,設置允許遲到時間為2s,依次統計窗口中各name值的數據

4、輸出統計結果以及遲到數據

5、啟動Job

接下來開始輸入數據進行測試驗證:

Flink最難知識點解析:時間/窗口/水印/遲到數據處理

Flink最難知識點解析:時間/窗口/水印/遲到數據處理

可以看到window範圍為【15-20】,這時候我們再輸入幾條屬於該範圍的數據:

Flink最難知識點解析:時間/窗口/水印/遲到數據處理

Flink最難知識點解析:時間/窗口/水印/遲到數據處理

輸入了事件時間為17、16、15三條數據,都觸發了window操作,那我們試著輸入一下窗口範圍為【10-15】的數據:

Flink最難知識點解析:時間/窗口/水印/遲到數據處理

Flink最難知識點解析:時間/窗口/水印/遲到數據處理

窗口範圍為【10-15】的數據則屬於遲到的數據,已經超過了最大等待時間,我們可以來試著計算一下允許上個窗口遲到數據的waterMark值

窗口結束時間+延遲時間=最大waterMark值

15 + 2 = 17

當前的waterMark值為20,大於17,所以窗口範圍為10-15的數據已經是遲到的數據了

再來計算一下窗口時間範圍為15-20的臨界值:

20 + 2 = 22

即當waterMark上漲到22,15-20窗口範圍內的數據就屬於遲到數據,不能再參與計算了。

記住我們算出的臨界值22,繼續輸入數據測試:

Flink最難知識點解析:時間/窗口/水印/遲到數據處理

Flink最難知識點解析:時間/窗口/水印/遲到數據處理

輸入數據A時,waterMark上漲至21,此時輸入屬於15-20窗口範圍內的數據B,依然能觸發窗口操作;

輸入數據C,waterMark上漲至22,等於剛才我們算出來的臨界值,此時輸入,數據B,則已不能觸發窗口操作,屬於遲到的數據。

最後,總結一下flink對於延遲數據的處理:

如果延遲的數據有業務需要,則設置好允許延遲的時間,每個窗口都有屬於自己的最大等待延遲數據的時間限制:

窗口結束時間+延遲時間=最大waterMark值

即當waterMark值大於的上述計算出的最大waterMark值,該窗口內的數據就屬於遲到的數據,無法參與window計算。

看完覺得不錯可以關注一下小編,後續還會持續更新幹貨文章!!


分享到:


相關文章: