Spark-Core:調度

基本概念

  • Job(作業):RDD中有行動操作所生成一個或者多個調度階段。
  • Stage(調取階段): 一個Job需要拆分成多組任務來完成,每組任務由Stage封裝。與一個Job所涉及的PartitionRDD類似,Stage之間也有依賴關係。
  • Task(任務):一個獨立的工作單元,有Driver Program發送到Executor上去執行。通常情況下,一個Task處理RDD的一個Partition的數據。根據Task返回類型的不同,Task又分為ShuffleMapTask和ResultTask。是Spark執行的最小單元。
  • TaskSet(任務集): 一組任務就是一個TaskSet,對應一個Stage。其中,一個TaskSet的所有Task之間沒有Shuffle依賴,因此互相之間可以並行運行。
  • DAGScheduler(高層調度器):面向調度階段的任務調度器,負責接受Spark應用程序提交的作業,根據RDD的以來關係劃分調度階段,並提交調度階段給TaskScheduler。
  • TaskScheduler(任務調度器):面向任務的調度器,接受DAGScheduler提交的調度階段,然後把任務分發到Worker節點運行,由Worker節點的Executor來運行該任務。

概述

Spark的調度系統用於將用戶提價的“任務”調度到集群中的不同節點執行。

Spark資源調度分為兩層

  • 第一層是Cluster Manager,將資源分配給Application;
  • 第二層是Application,進一步將資源分配給Application的各個Task。

Spark 首先會對提交的Job進行一系列RDD的轉換,並通過RDD的依賴關係構成有向無環圖(Direct Acyclic Graph, DAG)。然後根據RDD依賴的不同將RDD劃分到不同的階段(Stage),每個階段按照分區(Partition)的數量創建多個任務(Task)。最後將這些任務提交到集群的各個節點上運行。

在Spark中最重要的是DAGScheduler和TaskScheduler兩個調度器,其中DAGScheduler負責任務的邏輯調度,講作業拆分成不同階段的具有依賴關係的任務集,而TaskScheduler則負責具體任務的調度執行。

調度系統的主要工作流程如下:

Spark-Core:調度

Spark調度

  • 1)build operator DAG:用戶提交的Job將首先被轉換成一系列RDD並通過RDD之間的關係構建DAG,然後將RDD構成的DAG提交到調度系統。
  • 2)split graph into stages of tasks:DAGScheduler負責接受有RDD構成的DAG,醬油系列RDD劃分到不同的Stage。根據Stage的不同類型(目前有ResultStage和ShuffleStage兩種),給Stage中未完成的Partition創建不同類型的Task(目前有ResultTask和ShuffleMapTask兩種)。每個Stage將因為未完成Partition的多少,創建零到多個Tasl。DAGScheduler最後將每個Stage中的Task以任務集合(TaskSet)的形式提交給TaskScheduler繼續處理。
  • 3)launch tasks via cluster manager:使用集群管理器(cluster manager)分配資源與任務調度,對於失敗的任務還會有一定的重試與容錯機制。TaskSchedler負責從DAGScheduler接受TaskSet,創建TaskSetManager對TaskSet進行管理,並將此TaskSetManager添加到調度池中,最後將對Task的調度交給調度後端接口(SchedulerBackend)處理。SchedulerBackend首先申請TaskShceduler,按照Task調度算法(目前有FIFO和FAIR兩種)對調度池中的所有TaskSetManager進行排序,然後對TaskSet按照最大本地性原則分配資源,最後在各個分配的節點上運行TaskSet中的Task。
  • 4)execute tasks:執行任務,並將任務中間結果和最終結果存入存儲體系。

DAGScheduler

DAGScheduler是面向Stage的高層調度器,DAGScheduler把DAG拆分成很多的Tasks,每組Tasks都是一個Stage,解析時以Shuffle為邊界反向解析構建Stage,每當遇到Shuffle,就會產生新的Stage,然後以一個個TaskSet的形式提交給底層調度器TaskScheduler。DAGScheduler需要記錄哪些RDD被存入磁盤等物化動作,同時要尋求Task的最優化調度。DAGScheduler還需要監視因為Shuffle跨節點輸出可能導致的失敗,如果發現這個Stage失敗,可能就要重新提交該Stage。

DAG將調度提交給DAGScheduler,DAGScheduler調度時會根據是否需要經過Shuffle過程將Job劃分為多個Stage。DAGScheduler的調度過程中,Stage階段的劃分是根據是否有Shuffle過程,也就是當存在ShuffleDependency的寬依賴時,需要進行Shuffle,這是才會將作業(Job)劃分成多個Stage。

TaskScheduler

TaskScheduler的核心任務是提交TaskSet到集群運算並彙報結果。

  • 為TaskSet創建和維護一個TaskSetManager,並跟蹤任務的本地性以及錯誤信息。
  • 遇到Straggle任務時,會放到其他節點進行重試
  • 向DAGScheduler回報執行情況,包括在SHuffle輸出丟失的時候,報告fetch failed錯誤。

TaskSet是一個數據結構,TaskSet包含一系列高層調度器交給底層調度器的任務的集合。


分享到:


相關文章: