Flink從入門到放棄(九)-window&time概念理解

開始到window了,先回顧下入門版概念中對window的定義:

Window的定義

  • window:用來對一個無限的流設置一個有限的集合,在有界的數據集上進行操作的一種機制。window 又可以分為基於時間(Time-based)的 window 以及基於數量(Count-based)的 window。 KeyedStream→WindowedStream,注意datasource類型有變化 可以在已經分區的KeyedStream上定義Windows。Windows根據某些特徵(例如,在最後5秒內到達的數據)對每個Keys中的數據進行分組。有關https://flink.xskoo.com/dev/stream/operators/windows.html的完整說明,請參見windows。
<code>    

dataStream

.keyBy

(

0

)

.window

(TumblingEventTimeWindows.of(Time.seconds(

5

))); /<code>
  • tumbling time windows(翻滾時間窗口) -- 不會有窗口重疊,也就是一個元素只能出現在一個窗口中
  • sliding time windows(滑動時間窗口)--會有窗口重疊,也就是一個元素可以出現在多個窗口中
<code>    

data

.keyBy

(

1

)

.timeWindow

(Time.minutes(

1

))

.sum

(

1

);

data

.keyBy

(

1

)

.timeWindow

(Time.minutes(

1

), Time.seconds(

30

))

.sum

(

1

);/<code>
  • timeWindow: 如上所說,根據時間來聚合流數據。例如:一分鐘的 tumbling time window 收集一分鐘的元素,並在一分鐘過後對窗口中的所有元素應用於一個函數。
  • windowAll: DataStream→AllWindowedStream,可以在非分區的數據上直接做windowAll操作 Windows可以在常規DataStream上定義。Windows根據某些特徵(例如,在最後5秒內到達的數據)對所有流事件進行分組。有關https://flink.xskoo.com/dev/stream/operators/windows.html的完整說明,請參見windows。
<code>    **警告:**在許多情況下,這**是非並行**轉換。所有記錄將收集在windowAll 算子的一個任務中。/<code>
<code>     

dataStream

.windowAll

(TumblingEventTimeWindows.of(Time.seconds(

5

))); /<code>

官方文檔中關於Window的說明:

Windows是處理無限流的核心。Windows將流拆分為有限大小的“桶”,我們可以在其上應用計算。本文檔重點介紹如何在Flink中執行窗口,以及程序員如何從其提供的函數中獲益最大化。

窗口Flink程序的一般結構如下所示。第一個片段指的是被Keys化流,而第二個片段指的是非被Keys化流。正如人們所看到的,唯一的區別是window(...)針對keyby之後的keyedStream,而windowAll(...)針對非被Key化的數據流。

被Keys化Windows

<code>stream
       .keyBy(...)               required: 

"assigner"

[.trigger(...)] optional:

"trigger"

(

else

default

trigger) [.evictor(...)] optional:

"evictor"

(

else

no evictor) [.allowedLateness(...)] optional:

"lateness"

(

else

zero) [.sideOutputLateData(...)] optional:

"output tag"

(

else

no side output

for

late data) .

reduce

/aggregate/fold/apply() required:

"function"

[.getSideOutput(...)] optional:

"output tag"

/<code>

非被Keys化Windows

<code>stream
       .windowAll(...)           required: 

"assigner"

[.trigger(...)] optional:

"trigger"

(

else

default

trigger) [.evictor(...)] optional:

"evictor"

(

else

no evictor) [.allowedLateness(...)] optional:

"lateness"

(

else

zero) [.sideOutputLateData(...)] optional:

"output tag"

(

else

no side output

for

late data) .

reduce

/aggregate/fold/apply() required:

"function"

[.getSideOutput(...)] optional:

"output tag"

/<code>

在上面,方括號([...])中的命令是可選的。這表明Flink允許您以多種不同方式自定義窗口邏輯,以便最適合您的需求。

window VS timeWindow

flink中keyedStream中還有一個timeWindow方法,這個方法是在window的基礎上做的封裝,看下代碼實現:

<code>     
    

public

WindowedStream timeWindow(Time size) {

if

(environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {

return

window(TumblingProcessingTimeWindows.of(size)); }

else

{

return

window(TumblingEventTimeWindows.of(size)); } }

public

WindowedStream timeWindow(Time size, Time slide) {

if

(environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {

return

window(SlidingProcessingTimeWindows.of(size, slide)); }

else

{

return

window(SlidingEventTimeWindows.of(size, slide)); } }/<code>

可以看到,不管是tumbling time windows,還是sliding time windows,底層都是window方法,所以在具體實現時,大多數情況下可以直接使用timewindow來替換window。

filnk中的Time類型

從上面代碼中可以看到,有Processing Time、Event Time,其實還有一種,叫Ingestion Time,以下解釋轉自flink官網及《從0到1學習Flink》(
http://www.54tianzhisheng.cn/)—— Flink 中幾種 Time 詳解中的說明,如下:

Processing Time:


Processing Time 是指事件被處理時機器的系統時間。

當流程序在 Processing Time 上運行時,所有基於時間的操作(如時間窗口)將使用當時機器的系統時間。每小時 Processing Time 窗口將包括在系統時鐘指示整個小時之間到達特定操作的所有事件。

例如,如果應用程序在上午 9:15 開始運行,則第一個每小時 Processing Time 窗口將包括在上午 9:15 到上午 10:00 之間處理的事件,下一個窗口將包括在上午 10:00 到 11:00 之間處理的事件。

Processing Time 是最簡單的 “Time” 概念,不需要流和機器之間的協調,它提供了最好的性能和最低的延遲。但是,在分佈式和異步的環境下,Processing Time 不能提供確定性,因為它容易受到事件到達系統的速度(例如從消息隊列)、事件在系統內操作流動的速度以及中斷的影響。

Processing time refers to the system time of the machine that is executing the respective operation.

When a streaming program runs on processing time, all time-based operations (like time windows) will use the system clock of the machines that run the respective operator. An hourly processing time window will include all records that arrived at a specific operator between the times when the system clock indicated the full hour. For example, if an application begins running at 9:15am, the first hourly processing time window will include events processed between 9:15am and 10:00am, the next window will include events processed between 10:00am and 11:00am, and so on.

Processing time is the simplest notion of time and requires no coordination between streams and machines. It provides the best performance and the lowest latency. However, in distributed and asynchronous environments processing time does not provide determinism, because it is susceptible to the speed at which records arrive in the system (for example from the message queue), to the speed at which the records flow between operators inside the system, and to outages (scheduled, or otherwise).

簡單來說,processing time 就是在flink集群上,當前數據被處理的時間;以flink集群當前時間為準。

不過就像上面說的,在分佈式和異步的場景下,PT無法保證數據處理的時間跟數據真正發生的時間是一致的,因為MQ可能會亂序到達、重試之後到達;而數據在flink中處理時,併發下,某些線程處理速度的快慢也有可能會導致某些數據後發而先至。

Event Time:


Event Time 是事件發生的時間,一般就是數據本身攜帶的時間。這個時間通常是在事件到達 Flink 之前就確定的,並且可以從每個事件中獲取到事件時間戳。在 Event Time 中,時間取決於數據,而跟其他沒什麼關係。Event Time 程序必須指定如何生成 Event Time 水印,這是表示 Event Time 進度的機制。

完美的說,無論事件什麼時候到達或者其怎麼排序,最後處理 Event Time 將產生完全一致和確定的結果。但是,除非事件按照已知順序(按照事件的時間)到達,否則處理 Event Time 時將會因為要等待一些無序事件而產生一些延遲。由於只能等待一段有限的時間,因此就難以保證處理 Event Time 將產生完全一致和確定的結果。

假設所有數據都已到達, Event Time 操作將按照預期運行,即使在處理無序事件、延遲事件、重新處理歷史數據時也會產生正確且一致的結果。 例如,每小時事件時間窗口將包含帶有落入該小時的事件時間戳的所有記錄,無論它們到達的順序如何。

請注意,有時當 Event Time 程序實時處理實時數據時,它們將使用一些 Processing Time 操作,以確保它們及時進行。

Event time is the time that each individual event occurred on its producing device. This time is typically embedded within the records before they enter Flink, and that event timestamp can be extracted from each record. In event time, the progress of time depends on the data, not on any wall clocks. Event time programs must specify how to generate Event Time Watermarks, which is the mechanism that signals progress in event time. This watermarking mechanism is described in a later section, https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_time.html#event-time-and-watermarks.

In a perfect world, event time processing would yield completely consistent and deterministic results, regardless of when events arrive, or their ordering. However, unless the events are known to arrive in-order (by timestamp), event time processing incurs some latency while waiting for out-of-order events. As it is only possible to wait for a finite period of time, this places a limit on how deterministic event time applications can be.

Assuming all of the data has arrived, event time operations will behave as expected, and produce correct and consistent results even when working with out-of-order or late events, or when reprocessing historic data. For example, an hourly event time window will contain all records that carry an event timestamp that falls into that hour, regardless of the order in which they arrive, or when they are processed. (See the section on https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_time.html#late-elements for more information.)

Note that sometimes when event time programs are processing live data in real-time, they will use some processing time operations in order to guarantee that they are progressing in a timely fashion.

簡單來說,event time就是數據在各自的業務服務器上產生的時間,跟flink無關。

不過由於ET數據到達的方式可能會出現亂序,flink在處理數據的時候就需要等待一些時間,確保一些無序事件都被處理掉,也就導致了會出現延遲。

另外,由於ET數據處理和flink中時間無關,所以要指定watermark,即水印,用於表示當前數據處理的進度。

Ingestion Time:


Ingestion Time 是事件進入 Flink 的時間。 在源操作處,每個事件將源的當前時間作為時間戳,並且基於時間的操作(如時間窗口)會利用這個時間戳。

Ingestion Time 在概念上位於 Event Time 和 Processing Time 之間。 與 Processing Time 相比,它稍微重一些,但結果更可預測。因為 Ingestion Time 使用穩定的時間戳(在源處分配一次),所以對事件的不同窗口操作將引用相同的時間戳,而在 Processing Time 中,每個窗口操作符可以將事件分配給不同的窗口(基於機器系統時間和到達延遲)。

與 Event Time 相比,Ingestion Time 程序無法處理任何無序事件或延遲數據,但程序不必指定如何生成水印。

在 Flink 中,,Ingestion Time 與 Event Time 非常相似,但 Ingestion Time 具有自動分配時間戳和自動生成水印功能。

Ingestion time is the time that events enter Flink. At the source operator each record gets the source’s current time as a timestamp, and time-based operations (like time windows) refer to that timestamp.

Ingestion time sits conceptually in between event time and processing time. Compared to processing time, it is slightly more expensive, but gives more predictable results. Because ingestion time uses stable timestamps (assigned once at the source), different window operations over the records will refer to the same timestamp, whereas in processing time each window operator may assign the record to a different window (based on the local system clock and any transport delay).

Compared to event time, ingestion time programs cannot handle any out-of-order events or late data, but the programs don’t have to specify how to generate watermarks.

Internally, ingestion time is treated much like event time, but with automatic timestamp assignment and automatic watermark generation.

介於PT和ET之間,指數據進入到Flink中的時間。

Time圖

下面一張圖表示了各個時間的產生(來自官網):

Flink從入門到放棄(九)-window&time概念理解

這個圖畫的很清晰,可以很清楚的看到每種類型的時間產生的位置。

參考資料:

http://www.54tianzhisheng.cn/2018/12/11/Flink-time/

http://www.54tianzhisheng.cn/2018/12/08/Flink-Stream-Windows/

本文由博客一文多發平臺 https://openwrite.cn?from=article_bottom 發佈!


分享到:


相關文章: