後臺回覆”ffa“可以查看 Flink 資料
一、背景
FLink Job端到端延遲是一個重要的指標,用來衡量Flink任務的整體性能和響應延遲(大部分流式應用,要求低延遲特性)。
通過流處理引擎競品對比,我們發現大部分流計算引擎產品,都在告警監控頁面,集成了全鏈路時延指標展示。
一些低延時的處理場景,例如用於登陸、用戶下單規則檢測,實時預測場景,需要一個可度量的Metric指標,來實時觀測、監控集群全鏈路時延情況。
二、源碼分析來源
1、本文的源碼分析基於FLink社區issue FLINK-3660,以及issue對應的pr源碼pull-2386,另外,個人也新增了實現源碼的說明。
2、其pr源碼中只涉及到了部分全鏈路時延實現代碼,因此,我在文章中總結了:
Source到Sink處理Latency Marker源碼
-
LatencyMarksEmitter 提交時延標記類
LatencyStats(時延直方圖Metric實現)源碼
時延測量–整體架構圖
三、騰訊Oceanus監控指標參考
如下圖,紅色框線對應的數據延時,即我們描述的指標
四、Flink LatencyMarker實現思路
在webinterface中,加入流式job的端到端延遲是一個重要特性。因此,FLink社區最初的想法是在每個記錄的source上附加一個攝取時間( ingestion -time)時間戳。
然而,這為不使用monitor feature(監控功能)的用戶,帶來了額外開銷(每個元素+每個元素上的System.currentTimeMilis需要8個字節)。
因此,FLink社區最後決定,通過定期發送特殊事件來實現此功能,類似於通過拓撲發送水印watermark。
這些特殊事件(LatencyMarker)在source上以可配置發送間隔,並由任務Task轉發。Sink最後接收到LatencyMarks後,將比較LatencyMarker的時間戳與當前系統時間,以確定延遲。
LatencyMarker不會增加作業的延遲,但是LatencyMarker與常規記錄類似,可以被delay阻塞(例如反壓情況),因此LatencyMarker的延遲與Record延遲近似。
上述建議期望所有任務管理器TaskManager上的時鐘是同步的。否則,測量的延遲也包括TaskManager時鐘之間的偏移。
後續,我們可以嘗試通過使用JobManager作為計時服務中心(central timing service)來緩解這個問題。taskmanager將定期查詢JM的當前時間,以確定其時鐘的偏移量。
這個偏移量仍然包括TM和JM之間的網絡延遲,但是仍然比較好的測量時延。
五、Flink LatencyMarker實現源碼
本章節對應到pr源碼pull-2386的實現,這裡簡要說明。
Flink源碼中,引入了一個新的StreamElement,稱為LatencyMarker。
與水印類似,LatencyMarker按配置的間隔從源發出。這個時間間隔的默認值是0毫秒,即不觸發 (配置項在ExecutionConfig#latencyTrackingInterval,名稱metrics.latency.interval),例如可以配置成2000毫秒觸發一次LatencyMarker發送。
LatencyMarker不能“多於”常規元素。這確保了測量的延遲接近於常規流元素的端到端延遲。
常規操作符Operator(不包括那些參與迭代的Operator)如果不是sink,就會轉發延遲標記LatencyMarker。
具有多個輸出channel的Operator,隨機選擇一個channel通道,將LatencyMarker發送給它。這可以確保每個LatencyMarker標記在系統中只存在一次,並且重新分區步驟不會導致傳輸的LatencyMarker數量激增。
<code>public class RecordWriterOutput{/<code><code> @Override/<code><code> public void emitLatencyMarker(LatencyMarker latencyMarker) {/<code><code> serializationDelegate.setInstance(latencyMarker);/<code>
<code>try {/<code><code> // 內部實現了隨機選擇通道/<code><code> recordWriter.randomEmit(serializationDelegate);/<code><code> }/<code><code> catch (Exception e) {/<code><code> throw new RuntimeException(e.getMessage, e);/<code><code> }/<code><code> }/<code><code>}/<code>
上述RecordWriterOutput#emitLatencyMarker會被StreamSource、AbstractStreamOperator調用,分別實現source和中間operator的延遲標記下發
如果操作符Operator是Sink,它將維護每個已知source實例的最後512個LatencyMarker信息。
每個已知source的最小/最大/平均值/p50/p95/p99時延,在sink的LatencyStats對象中,進行彙總(如果沒有任何輸出的Operator,就是是sink)。
此pr代碼,不會在web ui中顯示延遲。此外,目前還沒有確保系統時鐘同步的機制,因此如果硬件時鐘不正確,則延遲測量將不準確。
六、總結說明
1、LatencyMarker不參與window、MiniBatch的緩存計時,直接被中間Operator下發。
2、Metric路徑:TaskManagerJobMetricGroup/operator_id/operator_subtask_index/latency
3、每個中間Operator、以及Sink都會統計自己與Source節點的鏈路延遲,我們在監控頁面,一般展示Source至Sink鏈路延遲。
4、延遲粒度細分到Task,可以用來排查哪臺機器的Task時延偏高,進行對比和運維排查。
5、從實現原理來看,發送時延標記間隔配置大一些(例如20秒一次),一般不會影響系統處理業務數據的性能。
END
閱讀更多 zhisheng的blog 的文章