12.21 Flink 全鏈路端到端延遲的測量方法

後臺回覆”ffa“可以查看 Flink 資料

一、背景

FLink Job端到端延遲是一個重要的指標,用來衡量Flink任務的整體性能和響應延遲(大部分流式應用,要求低延遲特性)。

通過流處理引擎競品對比,我們發現大部分流計算引擎產品,都在告警監控頁面,集成了全鏈路時延指標展示。

一些低延時的處理場景,例如用於登陸、用戶下單規則檢測,實時預測場景,需要一個可度量的Metric指標,來實時觀測、監控集群全鏈路時延情況。

二、源碼分析來源

1、本文的源碼分析基於FLink社區issue FLINK-3660,以及issue對應的pr源碼pull-2386,另外,個人也新增了實現源碼的說明。

2、其pr源碼中只涉及到了部分全鏈路時延實現代碼,因此,我在文章中總結了:

  • Source到Sink處理Latency Marker源碼

  • LatencyMarksEmitter 提交時延標記類

  • LatencyStats(時延直方圖Metric實現)源碼

  • 時延測量–整體架構圖

三、騰訊Oceanus監控指標參考

如下圖,紅色框線對應的數據延時,即我們描述的指標

Flink 全链路端到端延迟的测量方法
Flink 全链路端到端延迟的测量方法

四、Flink LatencyMarker實現思路

在webinterface中,加入流式job的端到端延遲是一個重要特性。因此,FLink社區最初的想法是在每個記錄的source上附加一個攝取時間( ingestion -time)時間戳。

然而,這為不使用monitor feature(監控功能)的用戶,帶來了額外開銷(每個元素+每個元素上的System.currentTimeMilis需要8個字節)。

因此,FLink社區最後決定,通過定期發送特殊事件來實現此功能,類似於通過拓撲發送水印watermark。

這些特殊事件(LatencyMarker)在source上以可配置發送間隔,並由任務Task轉發。Sink最後接收到LatencyMarks後,將比較LatencyMarker的時間戳與當前系統時間,以確定延遲。

LatencyMarker不會增加作業的延遲,但是LatencyMarker與常規記錄類似,可以被delay阻塞(例如反壓情況),因此LatencyMarker的延遲與Record延遲近似。

上述建議期望所有任務管理器TaskManager上的時鐘是同步的。否則,測量的延遲也包括TaskManager時鐘之間的偏移。

後續,我們可以嘗試通過使用JobManager作為計時服務中心(central timing service)來緩解這個問題。taskmanager將定期查詢JM的當前時間,以確定其時鐘的偏移量。

這個偏移量仍然包括TM和JM之間的網絡延遲,但是仍然比較好的測量時延。

五、Flink LatencyMarker實現源碼

本章節對應到pr源碼pull-2386的實現,這裡簡要說明。

Flink 全链路端到端延迟的测量方法

Flink源碼中,引入了一個新的StreamElement,稱為LatencyMarker。

與水印類似,LatencyMarker按配置的間隔從源發出。這個時間間隔的默認值是0毫秒,即不觸發 (配置項在ExecutionConfig#latencyTrackingInterval,名稱metrics.latency.interval),例如可以配置成2000毫秒觸發一次LatencyMarker發送。

LatencyMarker不能“多於”常規元素。這確保了測量的延遲接近於常規流元素的端到端延遲。

常規操作符Operator(不包括那些參與迭代的Operator)如果不是sink,就會轉發延遲標記LatencyMarker。

具有多個輸出channel的Operator,隨機選擇一個channel通道,將LatencyMarker發送給它。這可以確保每個LatencyMarker標記在系統中只存在一次,並且重新分區步驟不會導致傳輸的LatencyMarker數量激增。

<code>public class RecordWriterOutput{/<code><code> @Override/<code><code> public void emitLatencyMarker(LatencyMarker latencyMarker) {/<code><code> serializationDelegate.setInstance(latencyMarker);/<code>

<code>try {/<code><code> // 內部實現了隨機選擇通道/<code><code> recordWriter.randomEmit(serializationDelegate);/<code><code> }/<code><code> catch (Exception e) {/<code><code> throw new RuntimeException(e.getMessage, e);/<code><code> }/<code><code> }/<code><code>}/<code>

上述RecordWriterOutput#emitLatencyMarker會被StreamSource、AbstractStreamOperator調用,分別實現source和中間operator的延遲標記下發

如果操作符Operator是Sink,它將維護每個已知source實例的最後512個LatencyMarker信息。

每個已知source的最小/最大/平均值/p50/p95/p99時延,在sink的LatencyStats對象中,進行彙總(如果沒有任何輸出的Operator,就是是sink)。

此pr代碼,不會在web ui中顯示延遲。此外,目前還沒有確保系統時鐘同步的機制,因此如果硬件時鐘不正確,則延遲測量將不準確。

六、總結說明

1、LatencyMarker不參與window、MiniBatch的緩存計時,直接被中間Operator下發。

2、Metric路徑:TaskManagerJobMetricGroup/operator_id/operator_subtask_index/latency

3、每個中間Operator、以及Sink都會統計自己與Source節點的鏈路延遲,我們在監控頁面,一般展示Source至Sink鏈路延遲。

4、延遲粒度細分到Task,可以用來排查哪臺機器的Task時延偏高,進行對比和運維排查。

5、從實現原理來看,發送時延標記間隔配置大一些(例如20秒一次),一般不會影響系統處理業務數據的性能。

END


分享到:


相關文章: