深入瞭解 Apache Flink 的網絡協議棧

Flink 的網絡協議棧是組成 flink-runtime 模塊的核心組件之一,是每個 Flink 作業的核心。它連接所有 TaskManager 的各個子任務(Subtask),因此,對於 Flink 作業的性能包括吞吐與延遲都至關重要。與 TaskManager 和 JobManager 之間通過基於 Akka 的 RPC 通信的控制通道不同,TaskManager 之間的網絡協議棧依賴於更加底層的 Netty API。

本文將首先介紹 Flink 暴露給流算子(Stream operator)的高層抽象,然後詳細介紹 Flink 網絡協議棧的物理實現和各種優化、優化的效果以及 Flink 在吞吐量和延遲之間的權衡。

1.邏輯視圖

Flink 的網絡協議棧為彼此通信的子任務提供以下邏輯視圖,例如在 A 通過 keyBy() 操作進行數據 Shuffle :

原理解析 | 深入瞭解 Apache Flink 的網絡協議棧

這一過程建立在以下三種基本概念的基礎上:

▼ 子任務輸出類型(ResultPartitionType):

  • Pipelined(有限的或無限的):一旦產生數據就可以持續向下遊發送有限數據流或無限數據流。
  • Blocking:僅在生成完整結果後向下游發送數據。

▼ 調度策略:

  • 同時調度所有任務(Eager):同時部署作業的所有子任務(用於流作業)。
  • 上游產生第一條記錄部署下游(Lazy):一旦任何生產者生成任何輸出,就立即部署下游任務。
  • 上游產生完整數據部署下游:當任何或所有生產者生成完整數據後,部署下游任務。

▼ 數據傳輸:

  • 高吞吐:Flink 不是一個一個地發送每條記錄,而是將若干記錄緩衝到其網絡緩衝區中並一次性發送它們。這降低了每條記錄的發送成本因此提高了吞吐量。
  • 低延遲:當網絡緩衝區超過一定的時間未被填滿時會觸發超時發送,通過減小超時時間,可以通過犧牲一定的吞吐來獲取更低的延遲。

我們將在下面深入 Flink 網絡協議棧的物理實現時看到關於吞吐延遲的優化。對於這一部分,讓我們詳細說明輸出類型與調度策略。首先,需要知道的是子任務的輸出類型和調度策略是緊密關聯的,只有兩者的一些特定組合才是有效的。

Pipelined 結果是流式輸出,需要目標 Subtask 正在運行以便接收數據。因此需要在上游 Task 產生數據之前或者產生第一條數據的時候調度下游目標 Task 運行。批處理作業生成有界結果數據,而流式處理作業產生無限結果數據。

批處理作業也可能以阻塞方式產生結果,具體取決於所使用的算子和連接模式。在這種情況下,必須等待上游 Task 先生成完整的結果,然後才能調度下游的接收 Task 運行。這能夠提高批處理作業的效率並且佔用更少的資源。

下表總結了 Task 輸出類型以及調度策略的有效組合:

原理解析 | 深入瞭解 Apache Flink 的網絡協議棧

註釋:

[1]目前 Flink 未使用

[2]批處理 / 流計算統一完成後,可能適用於流式作業

此外,對於具有多個輸入的子任務,調度以兩種方式啟動:當所有或者任何上游任務產生第一條數據或者產生完整數據時調度任務運行。要調整批處理作業中的輸出類型和調度策略,可以參考 ExecutionConfig#setExecutionMode()——尤其是 ExecutionMode,以及 ExecutionConfig#setDefaultInputDependencyConstraint()。

2.物理數據傳輸

為了理解物理數據連接,請回想一下,在 Flink 中,不同的任務可以通過 Slotsharing group 共享相同 Slot。TaskManager 還可以提供多個 Slot,以允許將同一任務的多個子任務調度到同一個 TaskManager 上。

對於下圖所示的示例,我們假設 2 個併發為 4 的任務部署在 2 個 TaskManager 上,每個 TaskManager 有兩個 Slot。TaskManager 1 執行子任務 A.1,A.2,B.1 和 B.2,TaskManager 2 執行子任務 A.3,A.4,B.3 和 B.4。在 A 和 B 之間是 Shuffle 連接類型,比如來自於 A 的 keyBy() 操作,在每個 TaskManager 上會有 2x4 個邏輯連接,其中一些是本地的,另一些是遠程的:

原理解析 | 深入瞭解 Apache Flink 的網絡協議棧

不同任務(遠程)之間的每個網絡連接將在 Flink 的網絡堆棧中獲得自己的 TCP 通道。但是,如果同一任務的不同子任務被調度到同一個 TaskManager,則它們與同一個 TaskManager 的網絡連接將多路複用並共享同一個 TCP 信道以減少資源使用。在我們的例子中,這適用於 A.1→B.3,A.1→B.4,以及 A.2→B.3 和 A.2→B.4,如下圖所示:

原理解析 | 深入瞭解 Apache Flink 的網絡協議棧

每個子任務的輸出結果稱為 ResultPartition,每個 ResultPartition 被分成多個單獨的 ResultSubpartition- 每個邏輯通道一個。Flink 的網絡協議棧在這一點的處理上,不再處理單個記錄,而是將一組序列化的記錄填充到網絡緩衝區中進行處理。每個子任務本地緩衝區中最多可用 Buffer 數目為(每個發送方和接收方各一個):

#channels * buffers-per-channel + floating-buffers-per-gate

單個 TaskManager 上的網絡層 Buffer 總數通常不需要配置。有關如何在需要時進行配置的詳細信息,請參閱配置網絡緩衝區的文檔。

造成反壓(1)

每當子任務的數據發送緩衝區耗盡時——數據駐留在 Subpartition 的緩衝區隊列中或位於更底層的基於 Netty 的網絡堆棧內,生產者就會被阻塞,無法繼續發送數據,而受到反壓。接收端以類似的方式工作:Netty 收到任何數據都需要通過網絡 Buffer 傳遞給 Flink。如果相應子任務的網絡緩衝區中沒有足夠可用的網絡 Buffer,Flink 將停止從該通道讀取,直到 Buffer 可用。這將反壓該多路複用上的所有發送子任務,因此也限制了其他接收子任務。下圖說明了過載的子任務 B.4,它會導致多路複用的反壓,也會導致子任務 B.3 無法接受和處理數據,即使是 B.3 還有足夠的處理能力。

原理解析 | 深入瞭解 Apache Flink 的網絡協議棧

為了防止這種情況發生,Flink 1.5 引入了自己的流量控制機制。

3.Credit-based 流量控制

Credit-based 流量控制可確保發送端已經發送的任何數據,接收端都具有足夠的能力(Buffer)來接收。新的流量控制機制基於網絡緩衝區的可用性,作為 Flink 之前機制的自然延伸。每個遠程輸入通道(RemoteInputChannel)現在都有自己的一組獨佔緩衝區(Exclusive buffer),而不是隻有一個共享的本地緩衝池(LocalBufferPool)。與之前不同,本地緩衝池中的緩衝區稱為流動緩衝區(Floating buffer),因為它們會在輸出通道間流動並且可用於每個輸入通道。

數據接收方會將自身的可用 Buffer 作為 Credit 告知數據發送方(1 buffer = 1 credit)。每個 Subpartition 會跟蹤下游接收端的 Credit(也就是可用於接收數據的 Buffer 數目)。只有在相應的通道(Channel)有 Credit 的時候 Flink 才會向更底層的網絡協議棧發送數據(以 Buffer 為粒度),並且每發送一個 Buffer 的數據,相應的通道上的 Credit 會減 1。除了發送數據本身外,數據發送端還會發送相應 Subpartition 中有多少正在排隊發送的 Buffer 數(稱之為 Backlog)給下游。數據接收端會利用這一信息(Backlog)去申請合適數量的 Floating buffer 用於接收發送端的數據,這可以加快發送端堆積數據的處理。接收端會首先申請和 Backlog 數量相等的 Buffer,但可能無法申請到全部,甚至一個都申請不到,這時接收端會利用已經申請到的 Buffer 進行數據接收,並監聽是否有新的 Buffer 可用。

原理解析 | 深入瞭解 Apache Flink 的網絡協議棧

Credit-based 的流控使用 Buffers-per-channel 來指定每個 Channel 有多少獨佔的 Buffer,使用 Floating-buffers-per-gate 來指定共享的本地緩衝池(Local buffer pool)大小(可選3),通過共享本地緩衝池,Credit-based 流控可以使用的 Buffer 數目可以達到與原來非 Credit-based 流控同樣的大小。這兩個參數的默認值是被精心選取的,以保證新的 Credit-based 流控在網絡健康延遲正常的情況下至少可以達到與原策略相同的吞吐。可以根據實際的網絡 RRT (round-trip-time)和帶寬對這兩個參數進行調整。

註釋3:如果沒有足夠的 Buffer 可用,則每個緩衝池將獲得全局可用 Buffer 的相同份額(±1)。

造成反壓(2)

與沒有流量控制的接收端反壓機制不同,Credit 提供了更直接的控制:如果接收端的處理速度跟不上,最終它的 Credit 會減少成 0,此時發送端就不會在向網絡中發送數據(數據會被序列化到 Buffer 中並緩存在發送端)。由於反壓只發生在邏輯鏈路上,因此沒必要阻斷從多路複用的 TCP 連接中讀取數據,也就不會影響其他的接收者接收和處理數據。

Credit-based 的優勢與問題

由於通過 Credit-based 流控機制,多路複用中的一個信道不會由於反壓阻塞其他邏輯信道,因此整體資源利用率會增加。此外,通過完全控制正在發送的數據量,我們還能夠加快 Checkpoint alignment:如果沒有流量控制,通道需要一段時間才能填滿網絡協議棧的內部緩衝區並表明接收端不再讀取數據了。在這段時間裡,大量的 Buffer 不會被處理。任何 Checkpoint barrier(觸發 Checkpoint 的消息)都必須在這些數據 Buffer 後排隊,因此必須等到所有這些數據都被處理後才能夠觸發 Checkpoint(“Barrier 不會在數據之前被處理!”)。

但是,來自接收方的附加通告消息(向發送端通知 Credit)可能會產生一些額外的開銷,尤其是在使用 SSL 加密信道的場景中。此外,單個輸入通道( Input channel)不能使用緩衝池中的所有 Buffer,因為存在無法共享的 Exclusive buffer。新的流控協議也有可能無法做到立即發送儘可能多的數據(如果生成數據的速度快於接收端反饋 Credit 的速度),這時則可能增長髮送數據的時間。雖然這可能會影響作業的性能,但由於其所有優點,通常新的流量控制會表現得更好。可能會通過增加單個通道的獨佔 Buffer 數量,這會增大內存開銷。然而,與先前實現相比,總體內存使用可能仍然會降低,因為底層的網絡協議棧不再需要緩存大量數據,因為我們總是可以立即將其傳輸到 Flink(一定會有相應的 Buffer 接收數據)。

在使用新的 Credit-based 流量控制時,可能還會注意到另一件事:由於我們在發送方和接收方之間緩衝較少的數據,反壓可能會更早的到來。然而,這是我們所期望的,因為緩存更多數據並沒有真正獲得任何好處。如果要緩存更多的數據並且保留 Credit-based 流量控制,可以考慮通過增加單個輸入共享 Buffer 的數量。

原理解析 | 深入瞭解 Apache Flink 的網絡協議棧

原理解析 | 深入瞭解 Apache Flink 的網絡協議棧

注意:如果需要關閉 Credit-based 流量控制,可以將這個配置添加到 flink-conf.yaml 中:taskmanager.network.credit-model:false。但是,此參數已過時,最終將與非 Credit-based 流控制代碼一起刪除。

4.序列號與反序列化

下圖從上面的擴展了更高級別的視圖,其中包含網絡協議棧及其周圍組件的更多詳細信息,從發送算子發送記錄(Record)到接收算子獲取它:

原理解析 | 深入瞭解 Apache Flink 的網絡協議棧

在生成 Record 並將其傳遞出去之後,例如通過 Collector#collect(),它被傳遞給 RecordWriter,RecordWriter 會將 Java 對象序列化為字節序列,最終存儲在 Buffer 中按照上面所描述的在網絡協議棧中進行處理。RecordWriter 首先使用 SpanningRecordSerializer 將 Record 序列化為靈活的堆上字節數組。然後,它嘗試將這些字節寫入目標網絡 Channel 的 Buffer 中。我們將在下面的章節回到這一部分。

在接收方,底層網絡協議棧(Netty)將接收到的 Buffer 寫入相應的輸入通道(Channel)。流任務的線程最終從這些隊列中讀取並嘗試在 RecordReader 的幫助下通過 SpillingAdaptiveSpanningRecordDeserializer 將累積的字節反序列化為 Java 對象。與序列化器類似,這個反序列化器還必須處理特殊情況,例如跨越多個網絡 Buffer 的 Record,或者因為記錄本身比網絡緩衝區大(默認情況下為32KB,通過 taskmanager.memory.segment-size 設置)或者因為序列化 Record 時,目標 Buffer 中已經沒有足夠的剩餘空間保存序列化後的字節數據,在這種情況下,Flink 將使用這些字節空間並繼續將其餘字節寫入新的網絡 Buffer 中。

4.1 將網絡 Buffer 寫入 Netty

在上圖中,Credit-based 流控制機制實際上位於“Netty Server”(和“Netty Client”)組件內部,RecordWriter 寫入的 Buffer 始終以空狀態(無數據)添加到 Subpartition 中,然後逐漸向其中填寫序列化後的記錄。但是 Netty 在什麼時候真正的獲取併發送這些 Buffer 呢?顯然,不能是 Buffer 中只要有數據就發送,因為跨線程(寫線程與發送線程)的數據交換與同步會造成大量的額外開銷,並且會造成緩存本身失去意義(如果是這樣的話,不如直接將將序列化後的字節發到網絡上而不必引入中間的 Buffer)。

在 Flink 中,有三種情況可以使 Netty 服務端使用(發送)網絡 Buffer:

  • 寫入 Record 時 Buffer 變滿,或者
  • Buffer 超時未被髮送,或
  • 發送特殊消息,例如 Checkpoint barrier。

▼ 在 Buffer 滿後發送

RecordWriter 將 Record 序列化到本地的序列化緩衝區中,並將這些序列化後的字節逐漸寫入位於相應 Result subpartition 隊列中的一個或多個網絡 Buffer中。雖然單個 RecordWriter 可以處理多個 Subpartition,但每個 Subpartition 只會有一個 RecordWriter 向其寫入數據。另一方面,Netty 服務端線程會從多個 Result subpartition 中讀取並像上面所說的那樣將數據寫入適當的多路複用信道。這是一個典型的生產者 - 消費者模式,網絡緩衝區位於生產者與消費者之間,如下圖所示。在(1)序列化和(2)將數據寫入 Buffer 之後,RecordWriter 會相應地更新緩衝區的寫入索引。一旦 Buffer 完全填滿,RecordWriter 會(3)為當前 Record 剩餘的字節或者下一個 Record 從其本地緩衝池中獲取新的 Buffer,並將新的 Buffer 添加到相應 Subpartition 的隊列中。這將(4)通知 Netty服務端線程有新的數據可發送(如果 Netty 還不知道有可用的數據的話4)。每當 Netty 有能力處理這些通知時,它將(5)從隊列中獲取可用 Buffer 並通過適當的 TCP 通道發送它。

原理解析 | 深入瞭解 Apache Flink 的網絡協議棧

註釋4:如果隊列中有更多已完成的 Buffer,我們可以假設 Netty 已經收到通知。

▼ 在 Buffer 超時後發送

為了支持低延遲應用,我們不能只等到 Buffer 滿了才向下遊發送數據。因為可能存在這種情況,某種通信信道沒有太多數據,等到 Buffer 滿了在發送會不必要地增加這些少量 Record 的處理延遲。因此,Flink 提供了一個定期 Flush 線程(the output flusher)每隔一段時間會將任何緩存的數據全部寫出。可以通過 StreamExecutionEnvironment#setBufferTimeout 配置 Flush 的間隔,並作為延遲5的上限(對於低吞吐量通道)。下圖顯示了它與其他組件的交互方式:RecordWriter 如前所述序列化數據並寫入網絡 Buffer,但同時,如果 Netty 還不知道有數據可以發送,Output flusher 會(3,4)通知 Netty 服務端線程數據可讀(類似與上面的“buffer已滿”的場景)。當 Netty 處理此通知(5)時,它將消費(獲取併發送)Buffer 中的可用數據並更新 Buffer 的讀取索引。Buffer 會保留在隊列中——從 Netty 服務端對此 Buffer 的任何進一步操作將在下次從讀取索引繼續讀取。

原理解析 | 深入瞭解 Apache Flink 的網絡協議棧

註釋5:嚴格來說,Output flusher 不提供任何保證——它只向 Netty 發送通知,而 Netty 線程會按照能力與意願進行處理。這也意味著如果存在反壓,則 Output flusher 是無效的。

▼ 特殊消息後發送

一些特殊的消息如果通過 RecordWriter 發送,也會觸發立即 Flush 緩存的數據。其中最重要的消息包括 Checkpoint barrier 以及 end-of-partition 事件,這些事件應該儘快被髮送,而不應該等待 Buffer 被填滿或者 Output flusher 的下一次 Flush。

▼ 進一步的討論

與小於 1.5 版本的 Flink 不同,請注意(a)網絡 Buffer 現在會被直接放在 Subpartition 的隊列中,(b)網絡 Buffer 不會在 Flush 之後被關閉。這給我們帶來了一些好處:

  • 同步開銷較少(Output flusher 和 RecordWriter 是相互獨立的)
  • 在高負荷情況下,Netty 是瓶頸(直接的網絡瓶頸或反壓),我們仍然可以在未完成的 Buffer 中填充數據
  • Netty 通知顯著減少

但是,在低負載情況下,可能會出現 CPU 使用率和 TCP 數據包速率的增加。這是因為,Flink 將使用任何可用的 CPU 計算能力來嘗試維持所需的延遲。一旦負載增加,Flink 將通過填充更多的 Buffer 進行自我調整。由於同步開銷減少,高負載場景不會受到影響,甚至可以實現更高的吞吐。

4.2 BufferBuilder 和 BufferConsumer

更深入地瞭解 Flink 中是如何實現生產者 - 消費者機制,需要仔細查看 Flink 1.5 中引入的 BufferBuilder 和 BufferConsumer 類。雖然讀取是以 Buffer 為粒度,但寫入它是按 Record 進行的,因此是 Flink 中所有網絡通信的核心路徑。因此,我們需要在任務線程(Task thread)和 Netty 線程之間實現輕量級連接,這意味著儘量小的同步開銷。你可以通過查看源代碼獲取更加詳細的信息。

5. 延遲與吞吐

引入網絡 Buffer 的目是獲得更高的資源利用率和更高的吞吐,代價是讓 Record 在 Buffer 中等待一段時間。雖然可以通過 Buffer 超時給出此等待時間的上限,但可能很想知道有關這兩個維度(延遲和吞吐)之間權衡的更多信息,顯然,無法兩者同時兼得。下圖顯示了不同的 Buffer 超時時間下的吞吐,超時時間從 0 開始(每個 Record 直接 Flush)到 100 毫秒(默認值),測試在具有 100 個節點每個節點 8 個 Slot 的群集上運行,每個節點運行沒有業務邏輯的 Task 因此只用於測試網絡協議棧的能力。為了進行比較,我們還測試了低延遲改進(如上所述)之前的 Flink 1.4 版本。

原理解析 | 深入瞭解 Apache Flink 的網絡協議棧

如圖,使用 Flink 1.5+,即使是非常低的 Buffer 超時(例如1ms)(對於低延遲場景)也提供高達超時默認參數(100ms)75% 的最大吞吐,但會緩存更少的數據。

6.結論

瞭解 Result partition,批處理和流式計算的不同網絡連接以及調度類型,Credit-Based 流量控制以及 Flink 網絡協議棧內部的工作機理,有助於更好的理解網絡協議棧相關的參數以及作業的行為。後續我們會推出更多 Flink 網絡棧的相關內容,並深入更多細節,包括運維相關的監控指標(Metrics),進一步的網絡調優策略以及需要避免的常見錯誤等。

Tips:文中綠色文字部分均有跳轉,點擊【閱讀原文】,查看原版!

via:

https://flink.apache.org/2019/06/05/flink-network-stack.html

翻譯:曹英傑


分享到:


相關文章: