10張圖帶你入門Flink分佈式運行時

本文將以WordCount的案例為主線,主要介紹Flink的設計和運行原理。關於Flink WordCount程序可以參考我之前的文章: 。閱讀完本文後,讀者可以對Flink的分佈式運行時有一個全面的認識。

1 Flink數據流圖簡介

1.1 Flink作業的邏輯視圖

在大數據領域,詞頻統計(WordCount)程序就像是一個編程語言的HelloWorld程序,它展示了一個大數據引擎的基本規範。麻雀雖小,五臟俱全,從這個樣例中,我們可以一窺Flink設計和運行原理。

10張圖帶你入門Flink分佈式運行時

圖 1 Flink樣例程序示意圖

如圖 1所示,程序分為三大部分,第一部分讀取數據源(Source),第二部分對數據做轉換操作(Transformation),最後將轉換結果輸出到一個目的地(Sink)。 代碼中的方法被稱為算子(Operator),是Flink提供給程序員的接口,程序員需要通過這些算子對數據進行操作。Source算子讀取數據源中的數據,數據源可以是數據流、也可以存儲在文件系統中的文件。Transformation算子對數據進行必要的計算處理。Sink算子將處理結果輸出,數據一般被輸出到數據庫、文件系統或下一個數據流程序。

我們可以把算子理解為1 + 2 運算中的加號,加號(+)是這個算子的一個符號表示,它表示對數字1和數字2做加法運算。同樣,在Flink或Spark這樣的大數據引擎中,算子對數據進行某種操作,程序員可以根據自己的需求調用合適的算子,完成所需計算任務。常用的算子有map、flatMap、keyBy、timeWindow等,它們分別對數據流執行不同類型的操作。

我們先對這個樣例程序中各個算子做一個簡單的介紹,關於這些算子的具體使用方式將在後續文章中詳細說明。

  • flatMap

flatMap對輸入進行處理,生成零到多個輸出。這裡是一個簡單的分詞過程,對一行字符串按照空格切分,生成一個(word, 1)的二元組。

  • keyBy

keyBy根據某個Key對數據重新分組。本例中是將flatMap生成的二元組(word, 1)中第一項作為Key,相同的單詞會被分到同一組。

  • timeWindow

timeWindow是時間窗口函數,用來界定對多長時間之內的數據做統計。

  • sum

sum為求和函數。sum(1)表示對二元組中第二個元素求和,因為經過前面的keyBy,所有相同的單詞都被分到了一起,因此,在這個分組內,將單詞出現次數做加和,就得到出現的總次數。

10張圖帶你入門Flink分佈式運行時

圖 2 WordCont程序的邏輯視圖

在程序實際運行前,Flink會將用戶編寫的代碼做一個簡單處理,生成一個如圖2所示的邏輯視圖。圖 2展示了WordCount程序中,數據從不同算子間流動的情況。圖中,圓圈代表算子,圓圈間的箭頭代表數據流,數據流在Flink程序中經過不同算子的計算,最終生成為目標數據。其中,keyBy、timeWindow和sum共同組成了一個時間窗口上的聚合操作,被歸結為一個算子。我們可以在Flink的Web UI中,點擊一個作業,查看這個作業的邏輯視圖。

對於詞頻統計這個案例,邏輯上來講無非是對數據流中的單詞做提取,然後使用一個Key-Value結構對單詞做詞頻計數,最後輸出結果即可,這樣的邏輯本可以用幾行代碼完成,改成使用算子形式,反而讓新人看著一頭霧水,為什麼一定要用算子的形式來寫程序呢?實際上,算子進化成當前這個形態,就像人類從石塊計數,到手指計數,到算盤計數,再到計算機計數這樣的進化過程一樣,儘管更低級的方式可以完成一定的計算任務,但是隨著計算規模的增長,古老的計數方式存在著低效的弊端,無法完成更高級別和更大規模的計算需求。試想,如果我們不使用大數據引擎提供的算子,而是自己實現一套上述的計算邏輯,儘管我們可以快速完成當前的詞頻統計的任務,但是當面臨一個新計算任務時,我們需要重新編寫程序,完成一整套計算任務。我們自己編寫代碼的橫向擴展性可能很低,當輸入數據暴增時,我們需要做很大改動,以部署在更多機器上。

大數據引擎的算子對計算做了一些抽象,對於新人來說有一定學習成本,而一旦掌握這門技術,人們所能處理的數據規模將成倍增加。大數據引擎的算子出現,正是針對數據分佈在多個節點的大數據場景下,需要一種統一的計算描述語言來對數據做計算而進化出的新計算形態。基於Flink的算子,我們可以定義一個數據流的邏輯視圖,以此完成對大數據的計算。剩下那些數據交換、橫向擴展、故障恢復等問題全交由大數據引擎來解決。

1.2 從邏輯視圖到物理執行

在絕大多數的大數據處理場景下,一臺機器節點無法處理所有數據,數據被切分到多臺節點上。在大數據領域,當數據量大到超過單臺機器處理能力時,需要將一份數據切分到多個分區(Partition)上,每個分區分佈在一臺虛擬機或物理機上。

前一小節已經提到,大數據引擎的算子提供了編程接口,我們可以使用算子構建數據流的邏輯視圖。考慮到數據分佈在多個節點的情況,邏輯視圖只是一種抽象,需要將邏輯視圖轉化為物理執行圖,才能在分佈式環境下執行。

10張圖帶你入門Flink分佈式運行時

圖 3 樣例程序物理執行示意圖

圖 3為WordCount程序的物理執行圖,這裡數據流分佈在2個分區上。箭頭部分表示數據流分區,圓圈部分表示算子在分區上的算子子任務(Operator Subtask)。從邏輯視圖變為物理執行圖後,FlatMap算子在每個分區都有一個算子子任務,以處理該分區上的數據:FlatMap[1/2]算子子任務處理第一個數據流分區上的數據,以此類推。

算子子任務又被稱為算子實例,一個算子在並行執行時,會有多個算子實例。即使輸入數據增多,我們也可以通過部署更多的算子實例來進行橫向擴展。從圖 3中可以看到,除去Sink外的算子都被分成了2個算子實例,他們的並行度(Parallelism)為2,Sink算子的並行度為1。並行度是可以被設置的,當設置某個算子的並行度為2時,也就意味著有這個算子有2個算子子任務(或者說2個算子實例)並行執行。實際應用中一般根據輸入數據量的大小,計算資源的多少等多方面的因素來設置並行度。

注意,在本例中,為了演示,我們把所有算子的並行度設置為了2:env.setParallelism(2);,把最後輸出的並行度設置成了1:wordCount.print().setParallelism(1);。如果不單獨設置print的並行度的話,它的並行度也是2。

算子子任務是Flink物理執行的基本單元,算子子任務之間是相互獨立的,某個算子子任務有自己的線程,不同算子子任務可能分佈在不同的節點上。後文在Flink的資源分配部分我們還會重點介紹算子子任務。

1.3 數據交換策略

圖 3中出現了數據流動的現象,即數據在不同的算子子任務上進行著數據交換。無論是Hadoop、Spark還是Flink,都都會涉及到數據交換策略。常見的據交換策略有4種,如圖 4所示。

10張圖帶你入門Flink分佈式運行時

圖 4 Flink數據交換策略

  1. 前向傳播(Forward):前一個算子子任務將數據直接傳遞給後一個算子子任務,數據不存在跨分區的交換,也避免了因數據交換產生的各類開銷,圖 3中Source和和FlatMap之間就是這樣的情形。
  2. 按Key分組(Key-Based):數據以(Key, Value)形式存在,該策略將所有數據進行分組,相同Key的數據會被分到一組,發送到同一個分區上。WordCount程序中,keyBy將單詞作為Key,把相同單詞都發送到同一分區,以方便後續算子的聚合統計。
  3. 廣播(Broadcast):將某份數據發送到所有分區上,這種策略涉及到了數據在全局的拷貝,因此非常消耗資源。
  4. 隨機策略(Random):該策略將所有數據隨機均勻地發送到多個分區上,以保證數據平均分配到不同分區上。該策略通常為了防止數據傾斜到某些分區,導致部分分區數據稀疏,另外一些分區數據擁堵。

2 Flink架構與核心組件

為了實現支持分佈式運行,Flink跟其他大數據引擎一樣,採用了主從(Master-Worker)架構,運行時主要包括兩個組件:

• Master是一個Flink作業的主進程。它起到了協調管理的作用。

• TaskManager,又被稱為Worker或Slave,是執行計算任務的進程。它擁有CPU、內存等計算資源。Flink作業需要將計算任務分發到多個TaskManager上並行執行。

下面將從作業執行層面來分析Flink各個模塊如何工作。

2.1 Flink作業執行過程

Flink為適應不同的基礎環境(獨立集群、YARN、Kubernetes),在不斷的迭代開發過程中已經逐漸形成了一個作業執行流程。不同的基礎環境對計算資源的管理方式略有不同,不過都大同小異,這裡以獨立集群(Standalone)為例,分析作業的分佈式執行流程。Standalone模式指Flink獨佔該集群,集群上無其他任務,如Spark、MapReduce等。

10張圖帶你入門Flink分佈式運行時

圖 5 Flink作業提交流程

在一個作業提交前,Master和TaskManager等進程需要先被啟動。我們可以在Flink主目錄中執行腳本來啟動這些進程:bin/start-cluster.sh。Master和TaskManager被啟動後,TaskManager需要將自己註冊給Master中的ResourceManager。這個初始化和資源註冊過程發生在單個作業提交前,我們稱之為第0步。

接下來我們逐步分析一個Flink作業如何被提交:

  • 用戶編寫應用程序代碼,並通過Flink客戶端(Client)提交作業。程序一般為Java或Scala語言,調用Flink API,構建基於邏輯視角的數據流圖,代碼和相關配置文件被編譯打包,被提交到Master的Dispatcher,形成一個應用作業(Application)。
  • Dispatcher接收到這個作業,啟動JobManager,這個JobManager會負責本次作業。
  • JobManager向ResourceManager申請本次作業所需資源。
  • 由於在第0步中TaskManager已經向ResourceManager中註冊了資源,這時閒置的TaskManager會被反饋給JobManager。
  • JobManager將用戶作業中的邏輯視圖轉化為圖3所示的並行化的物理執行圖,將計算任務分發部署到多個TaskManager上。至此,一個Flink作業就開始執行了。

TaskManager在執行計算任務過程中可能會與其他TaskManager交換數據,會使用圖 4提到的一些數據交換策略。同時,TaskManager也會將一些任務狀態信息會反饋給JobManager,這些信息包括任務啟動、運行或終止的狀態,快照的元數據等。

我們再對涉及到的各個組件進行更為詳細的介紹。

Client

用戶一般使用客戶端(Client)提交作業,比如Flink主目錄下的bin目錄中提供的命令行工具。Client會對用戶提交的Flink程序進行預處理,並把作業提交到Flink集群上。Client提交作業時需要配置一些必要的參數,比如使用Standalone還是YARN集群等。整個作業被打成了Jar包,DataStream API被轉換成了JobGraph,JobGraph是一種類似圖2的邏輯視圖。

Dispatcher

Dispatcher可以接收多個作業,每接收一個作業,Dispatcher都會為這個作業分配一個JobManager。Dispatcher對外提供一個REST式的接口,以HTTP的形式來對外提供服務。

JobManager

JobManager是單個Flink作業的協調者,一個作業會有一個JobManager來負責。JobManager會將Client提交的JobGraph轉化為ExceutionGraph,ExecutionGraph是類似圖3所示的可並行的物理執行圖。JobManager會向ResourceManager申請必要的資源,當獲取足夠的資源後,JobManager將ExecutionGraph以及具體的計算任務分發部署到多個TaskManager上。同時,JobManager還負責管理多個TaskManager,這包括:收集作業的狀態信息,生成檢查點,必要時進行故障恢復等問題。

ResourceManager

如前文所說,Flink現在可以部署在Standalone、YARN或Kubernetes等環境上,不同環境中對計算資源的管理模式略有不同,Flink使用一個名為ResourceManager的模塊來統一處理資源分配上的問題。在Flink中,計算資源的基本單位是TaskManager上的任務槽位(Task Slot,簡稱槽位Slot)。ResourceManager的職責主要是從YARN等資源提供方獲取計算資源,當JobManager有計算需求時,將空閒的Slot分配給JobManager。當計算任務結束時,ResourceManager還會重新收回這些Slot。

TaskManager

TaskManager是實際負責執行計算的節點。一般地,一個Flink作業是分佈在多個TaskManager上執行的,單個TaskManager上提供一定量的Slot。一個TaskManager啟動後,相關Slot信息會被註冊到ResourceManager中。當某個Flink作業提交後,TaskManager會將空閒的Slot信息提供給JobManager。JobManager獲取到空閒Slot信息後會將具體的計算任務部署到該Slot之上,任務開始在這些Slot上執行。在執行過程,由於要進行數據交換,TaskManager還要和其他TaskManager進行必要的數據通信。

總之,TaskManager負責具體計算任務的執行,啟動時它會將資源向ResourceManager註冊。

2.2 再談邏輯視圖到物理執行圖

瞭解了Flink的分佈式架構和核心組件,這裡我們從更細粒度上來介紹從邏輯視圖轉化為物理執行圖過程,該過程可以分成四層:StreamGraph -> JobGraph -> ExecutionGraph -> 物理執行圖。

10張圖帶你入門Flink分佈式運行時

圖 6 WordCount程序數據流圖轉化過程

  • StreamGraph:是根據用戶編寫的代碼生成的最初的圖,用來表示一個Flink作業的拓撲結構。在StreamGraph中,節點StreamNode就是算子。
  • JobGraph:JobGraph是提交給 JobManager 的數據結構。StreamGraph經過優化後生成了JobGraph,主要的優化為,將多個符合條件的節點鏈接在一起作為一個JobVertex節點,這樣可以減少數據交換所需要的傳輸開銷。這個鏈接的過程叫做算子鏈(Operator Chain),會在下一小節繼續介紹。JobVertex經過算子鏈後,會包含一到多個算子,它輸出是IntermediateDataSet,是經過算子處理產生的數據集。
  • ExecutionGraph:JobManager將 JobGraph轉化為ExecutionGraph。ExecutionGraph是JobGraph的並行化版本:假如某個JobVertex的並行度是2,那麼它將被劃分為2個ExecutionVertex,ExecutionVertex表示一個算子子任務,它監控著單個子任務的執行情況。每個ExecutionVertex會輸出一個IntermediateResultPartition,這是單個子任務的輸出,再經過ExecutionEdge輸出到下游節點。ExecutionJobVertex是這些並行子任務的合集,它監控著整個算子的運行情況。ExecutionGraph是調度層非常核心的數據結構。
  • 物理執行圖:JobManager根據ExecutionGraph對作業進行調度後,在各個TaskManager上部署具體的任務,物理執行圖並不是一個具體的數據結構。

可以看到,Flink在數據流圖上可謂煞費苦心,僅各類圖就有四種之多。對於新人來說,可以不用太關心這些非常細節的底層實現,只需要瞭解以下幾個核心概念:

  • Flink採用主從架構,Master起著管理協調作用,TaskManager負責物理執行,在執行過程中會發生一些數據交換、生命週期管理等事情。
  • 用戶調用Flink API,構造邏輯視圖,Flink會對邏輯視圖優化,並轉化為並行化的物理執行圖,最後被執行的是物理執行圖。

2.3 任務、算子子任務與算子鏈

在構造物理執行圖的過程中,Flink會將一些算子子任務鏈接在一起,組成算子鏈。鏈接後以任務(Task)的形式被TaskManager調度執行。使用算子鏈是一個非常有效的優化,它可以有效降低算子子任務之間的傳輸開銷。鏈接之後形成的Task是TaskManager中的一個線程。

10張圖帶你入門Flink分佈式運行時

圖 7 任務、子任務與算子鏈

例如,數據從Source前向傳播到FlatMap,這中間沒有發生跨分區的數據交換,因此,我們完全可以將Source、FlatMap這兩個子任務組合在一起,形成一個Task。數據經過keyBy發生了數據交換,數據會跨越分區,因此無法將keyBy以及其後面的窗口聚合鏈接到一起。由於WindowAggregation的並行度是2,Sink的並行度為1,數據再次發生了交換,我們不能把WindowAggregation和Sink兩部分鏈接到一起。1.2節中提到,Sink的並行度是人為設置為1,如果我們把Sink的並行度也設置為2,那麼是可以讓這兩個算子鏈接到一起的。

默認情況下,Flink會盡量將更多的子任務鏈接在一起,這樣能減少一些不必要的數據傳輸開銷。但一個子任務有超過一個輸入或發生數據交換時,鏈接就無法建立。兩個算子能夠鏈接到一起是有一些規則的,感興趣的讀者可以閱讀Flink源碼中org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator中的isChainable方法。StreamingJobGraphGenerator類的作用是將StreamGraph轉換為JobGraph。

儘管將算子鏈接到一起會降低一些傳輸開銷,但是也有一些情況並不需要太多鏈接。比如,有時候我們需要將一個非常長的算子鏈拆開,這樣我們就可以將原來集中在一個線程中的計算拆分到多個線程中來並行計算。Flink允許開發者手動配置是否啟用算子鏈,或者對哪些算子使用算子鏈。

2.4 任務槽位與計算資源

  • 任務槽位

根據前文的介紹,我們已經瞭解到TaskManager負責具體的任務執行。TaskManager是一個JVM進程,在TaskManager中可以並行運行多個Task。在程序執行之前,經過優化,部分子任務被鏈接在一起,組成一個Task。每個Task是一個線程,需要TaskManager為其分配相應的資源,TaskManager使用任務槽位給Task分配資源。

在解釋Flink任務槽位的概念前,我們先回顧一下進程與線程的概念。在操作系統層面,進程(Process)是進行資源分配和調度的一個獨立單位,線程(Thread)是CPU調度的基本單位。比如,我們常用的Office Word軟件,在啟動後就佔用操作系統的一個進程。Windows上可以使用任務管理器來查看當前活躍的進程,Linux上可以使用top命令來查看。線程是進程的一個子集,一個線程一般專注於處理一些特定任務,不獨立擁有系統資源,只擁有一些運行中必要的資源,如程序計數器。一個進程至少有一個線程,也可以有多個線程。多線程場景下,每個線程都處理一小個任務,多個線程以高併發的方式同時處理多個小任務,可以提高處理能力。

回到Flink的槽位分配機制上,一個TaskManager是一個進程,TaskManager可以管理一至多個Task,每個Task是一個線程,佔用一個槽位。每個槽位的資源是整個TaskManager資源的子集,比如這裡的TaskManager下有3個槽位,每個槽位佔用TaskManager所管理的1/3的內存,第一個槽位中的Task不會與第二個槽位中的Task互相爭搶內存資源。注意,在分配資源時,Flink並沒有將CPU資源明確分配給各個槽位。

10張圖帶你入門Flink分佈式運行時

圖 8 Task Slot與Task Manager

假設我們給WordCount程序分配兩個TaskManager,每個TaskManager又分配3個槽位,所以總共是6個槽位。結合圖 7中對這個作業的並行度設置,整個作業被劃分為5個Task,使用5個線程,這5個線程可以按照圖 8所示的方式分配到6個槽位中。

Flink允許用戶設置TaskManager中槽位的數目,這樣用戶就可以確定以怎樣的粒度將任務做相互隔離。如果每個TaskManager只包含一個槽位,那麼運行在該槽位內的任務將獨享JVM。如果TaskManager包含多個槽位,那麼多個槽位內的任務可以共享JVM資源,比如共享TCP連接、心跳信息、部分數據結構等。官方建議將槽位數目設置為TaskManager下可用的CPU核心數,那麼平均下來,每個槽位都能平均獲得1個CPU核心。

  • 槽位共享

圖 8中展示了任務的一種資源分配方式,默認情況下, Flink還提供了一種槽位共享(Slot Sharing)的優化機制,進一步優化數據傳輸開銷,充分利用計算資源。將圖 8中的任務做槽位共享優化後,結果如圖 9所示。

10張圖帶你入門Flink分佈式運行時

圖 9 槽位共享示意圖

開啟槽位共享後,Flink允許多個任務共享一個槽位。如圖 9中最左側的數據流,一個作業從Source到Sink的所有子任務都可以放置在一個槽位中,這樣數據交換成本更低。而且,對於一個數據流圖來說,Source、FlatMap等算子的計算量相對不大,WindowAggregation算子的計算量比較大,計算量較大的算子子任務與計算量較小的算子子任務可以互補,騰出更多的槽位,分配給更多Task,這樣可以更好地利用資源。如果不開啟槽位共享,如圖8所示,計算量小的Source、FlatMap算子子任務獨佔槽位,造成一定的資源浪費。

10張圖帶你入門Flink分佈式運行時

圖 10 槽位共享後,增大並行度,可以部署更多算子實例

圖 8中的方式共佔用5個槽位,支持槽位共享後,圖 9只佔用2個槽位。為了充分利用空槽位,剩餘的4個空槽位可以分配給別的作業,也可以通過修改並行度來分配給這個作業。例如,這個作業的輸入數據量非常大,我們可以把並行度設為6,更多的算子實例會將這些槽位填充,如圖10所示。

綜上,Flink的一個槽位中可能運行一個算子子任務、也可能是被鏈接的多個子任務,或者是多個子任務共享槽位,具體這個槽位上運行哪些計算由算子鏈和槽位共享兩個優化措施決定。


分享到:


相關文章: