【譯】Spark 作業調度

原文地址:https://spark.apache.org/docs/latest/job-scheduling.html

這篇不僅僅是簡單的翻譯,有些比較不容易懂的地方,加了一些備註,另外翻譯已經儘可能的準確,但是也難免有疏漏。

劉彬同學準備寫一系列spark實戰系列,本文是第三篇,翻譯的Spark 作業調度!贊!推薦給大家,希望大家喜歡和支持!

系列文章

SparkContext 初始化內部原理

checkpoint的實現

1 概述

Spark有幾個資源調度之間的工具,首先,回想一下,如集群部署模式概述中所提及的,每個Spark application (SparkContext實例)都會運行在一個獨立的Execute進程中,集群管理器提供了Spark運行跨應用程序的調度功能,第二,每個Spark應用程序,如果它們由不同的線程提交多個“Job”(Spark action)則可能同時運行,如果應用程序通過網絡服務請求,這是很正常的,Spark中包含一個公平調度程序用於在每個SparkContext中資源調度。

2 跨應用調度

當一個集群正在運行的時候,每個Spark應用程序都會獨立的設置一個Executor JVM只會給該應用程序運行任務和存儲數據,如果多個用戶需要共享你的集群,通過集群管理器,它們由不同的選項

來管理分配。

最簡單的選擇,所有集群管理員都可以使用,資源的靜態劃分,通過這種方法,每個應用程序都獲得了它可以使用的最大資源,並在整個過程中都保持這樣,這是在Spark的Standalone和Yarn模式

中使⽤用的⽅方法,以及在粗粒度的Mesos模式,基於集群類型,可以配置集群分配類型:

Standalone mode:默認情況下,應用程序提交在Standalone集群模式中將會以FIFO(先入先出)順序運行,每個應用程序將嘗試所有可⽤用的節點,可以通過配置應⽤用程序配置spark.core.max來限制使用的最大節點數,或者不通過這個設置,改變應用默認的設置spark.deploy.defaultCores;最後,除了控制cores,每個應⽤用程序還可以通過設置spark.executor.memory來控制內存使⽤用。

Mesos:Mesos使用靜態劃分,設置spark.mesos.coarse配置屬性為true,在Standalone模式可以選擇設置spark.cores.max將每個應⽤用程序的資源共享,也可以設置spark.executor.memory來控制內存使用

Yarn:Spark YARN客戶端的 —num-executors選項來控制在集群中分配資源的多少(spark.executor.instances 做為配置屬性)而—executor-memory( spark.executor.memory配置屬性 )和—

executor-cores( spark.executor.cores.配置屬性 )控制每個執行期的資源,更多信息請查看YARN Spark Properties(如果你的spark通過YARN來調度執行,推薦看一下)

Mesos的第二個選項是動態分配CPU內核,在這個模式裡,每個應用程序都有一個固定且獨立的內存分配(通過設置spark.executor.memory),但是當一個應用程序沒有在這個機器上運行,則其它的應用程序可能在這個集群上面分配內核運行任務,當你希望大量不太活躍的應用程序時這種方式是有用的,比如,來自不同用戶的shell會話,然後它帶來的風險就是可預見的延遲,因為當應⽤用程序有工作要做時,它可能需要在等待⼀一段時間才能獲得內核來執行,要使用此模式,簡單實用mesos:// 和 設置 spark.mesos.coarse 為false沒有一種應用程序可以跨應用程序進行內存共享,如果你想通過這個方法共享數據,建議運行一個單獨的服務器程序,該應用程序通過查詢多個RDDs來服務於多個請求動態資源分配

Spark提供了一種機制,可以通過工作負載動態的調整應用程序佔用的資源,這就意味著如果你的應用程序長時間沒有使用和請求那麼資源將會被回收返回給集群,這在多個應用程序在你的集群中

共享資源,這個特性很有用默認情況下,這個特性是禁用的,所有粗粒度的集群管理器都可用,⽐比如:Standalone 模式、Yarn模式和Mesos粗粒度模式都可用。

3 配置和設置

使用這個特性有兩個要求,第一,應用程序必須將spark.dynamicAlloction.enabled設置為true,第二,必須在同一個集群的每個worker節點設置設置一個外部shuffle以及將spark.shuffle.service.enabled設置為true

外部shuffle程序的目的是是允許刪除執行程序而不刪除它們所編寫的文件(下面描述的更詳細),在集群管理器中設置此服務有不同的⽅方式:在Standalone模式下,啟動workers並且將spark.shuffle.service.enable設置為true

在Mesos 粗粒度模式中,運行$SPARK_HOME/sbin/start-mesos-shuffle-service.sh 在所有從節點中並且將spark.shuffle.service.enabled設置為true,例如:可以通過Marathon來完成(Marathon:是mesos的⼀一個框架,支持長任務運行,比如Web服務等)

在YARN模式中,請看說明

譯者補充:

在你的Yarn集群中每個NodeManager啟動Spark Shuffle服務,需要遵循以下說明:

1.編譯Spark與Yarn Profile,如果使用的是pre-packaged版本,可以跳過這一步

2.定位 the spark--yarn-shuffle.jar. 如果你用的是自己編譯的spark並且使⽤用的是分佈式環境的話,這是應該是在 $SPARK_HOME/common/network-yarn/target/scala-

3.在集群所有的NodeManager上面添加這個JAR包到classpath

4.在每個節點上的 yarn-site.xml,添加spark_shuffle到 yarn.nodemanager.aux-services,然後設置yarn.nodemanager.aux-services.spark_shuffle.class

到org.apache.spark.network.yarn.YarnShuffleService.

5.通過在etc/hadoop/yarn-env中設置YARN_HEAPSIZE( 默認為1000 )來增加NodeManager的堆大小,在shuffle過程中避免垃圾回收

6.在集群中重啟所有的NodeManager

4 資源分配策略

在較高級別,在需要的時候,當它們不再使用時,Spark應該放棄Executors,當沒有一個明確的方法去預測即將移除的Executor是否會在將來繼續執行任務,或者將要添加的一組新的執行器是空閒

的,當移除和請求執⾏行器的時候我們需要⼀一組a set of heuristics to determinev

5 請求策略

當一個spark應用程序帶有動態策略,當它有等待被調度的任務時,請求附加執行器,這種情況意味著現有的執行器不足以飽和和提交尚未完成的所有任務

Spark 請求執行器in rounds ,當存在spark.dynamicAllocation.schedulerBacklogTimeout 秒等待任務時,將觸發實際的請求,如果等待隊列依然存在,然後每次觸發spark.dynamicAllocation.sustainedSchedulerBacklogTimeout 此外,每次請求的執行器數量都比上一次指數級,比如,一個應用程序在第一輪添加一個執⾏行器,然後在後續繼續添加了2,4,8個執⾏行器。

指數增加的動機是雙重的,首先,應用程序在開始時應該謹慎的請求執行器,避免發現只有少數幾個執行器可⽤用,這反而成為了TCP慢的理由,其次,應用程序應該及時提高資源使用率,避免實際

需要很多的執行器

6 移除策略

刪除執行器策略要簡單很多,當一個spark應用程序的空閒時間比spark.dynamicAllocation.executorIdleTimeout 要多時,Spark應該刪除執行器,注意,在大多數情況下,這個條件和請求條件是互斥的,在此條件下,如果仍然有未完成的任務,則執⾏行器不應處於空閒狀態

7 優雅的解除執行器

動態分配之前,一個Spark Executor退出失敗或者相關聯的應用程序退出,在這兩種情況下,與執行器相關的所有狀態都不再需要,可以安全的解除,但是,在動態分配情況下,與執行器顯式地刪除時,應用程序仍在運行,如果應用程序試圖訪問被執行人存儲或寫入的狀態,則必須執行重新計算狀態.

因此,Spark需要一種機制,在刪除之前保留它的狀態,從而優雅的移除它.這個條件對於shuffles及其重要,在Shuffle中,Spark Executor⾸首先將自己的Map輸出寫入到磁盤,然後當其它executors試圖獲取這些⽂文件時,它來充當這個文件的服務器,當任務執行緩慢的情況下,動態策略可能在shuffle完成之前刪除一個executor,在這種情況下,由該executor計算的shuffle需要重新計算⼀一遍.

保留shuffle 文件的解決方案是引入外部shuffle服務,在spark1.2引入了,這個服務是一個長時間運行的進程,它運行在集群的每個節點上面,獨立於應用程序和它的執行器,如果啟用了這個服務,

spark executor將從服務中獲取shuffle文件,而不是從彼此間獲取,這表示在executors所寫的所有shuffle狀態都將繼續被記錄在executor的整個生命週期

除了寫shuffle文件之外,executor還可以在磁盤和內存中緩存數據,但是,當刪除一個executor的時候,所有緩存數據就無法訪問,為了避免這種情況,默認的執行器中包含的緩存數據永遠不會刪除,可以配置spark.dynamicAllocation.cachedExecutorIdleTimeout,在以後版本中,緩存的數據可以通過一個類似於storage的堆外內存存儲,這與通過外部shuffle服務保存文件的⽅方式類似。

8 應用程序內調度

在給定的spark應⽤用程序(SparkContext實例)中,如果從單線程中提交多個並行作業,則可以同時並行運行多個作業,通過“Job”,在本節中我們指的是一個Spark動作(比如:save,collect )

和任務需要運行的任務來評估該操作,Spark的調度器是線程安全的,它支持一個實例服務於多個請求的應用程序(比如多個用戶查詢)

默認情況下,Spark的調度器以FIFO方式運行作業,每個工作分為“階段”(比如map和reduce階段),第一個job獲取所有可⽤用的資源,階段任務啟動,二個階段的任務優先執行等等,如果隊列頭

部的作業不需要整個集群,則稍後的作業可以立即開始執行,但是如果隊列頭部的作業很大,則後面的作業可能被延遲

從Spark0.8開始,還可以配置作業間的公平共享,在公平共享下,Spark以“循環型”的方式分配任務之間的任務,這樣所有工作都能得到大致相等的集群資源份額,這意味著,在長時間工作時提交

的短作業可以立即開始執行,而且能得到很好的響應時間,而不必等待長時間的任務,這種模式比較適合多用戶

要啟動公平調度程序,只需在配置SparkContext的時候,設置spark.scheduler.mode 屬性為FAIR :

val conf = new SparkConf().setMaster(...).setAppName(...)

conf.set("spark.scheduler.mode", "FAIR")

val sc = new SparkContext(conf)

9 Fair調度器池

公平調度⽀支持將將作業分組到池中,然後為每個池分配不同的調度選項,這對於為一個重要的job創建一個“高優先級”池非常有用,比如:將每個user的job分組在一起然後給用戶相同的份額,而不用管他們有多少個併發的job,此方法是在Hadoop Fair Scheduler之後建立的。如果沒有任何修改,新提交的作業將進入默認池中,但是可以通過添加來spark.scheduler.pool設置作業池將本地配置提交到它們的線程池的SparkContext中,比如:

// Assuming sc is your SparkContext variable

sc.setLocalProperty("spark.scheduler.pool", "pool1”)

在設置本地屬性之後,所有在該線程中提交的作業(通過此線程中的調⽤用者發送給RDD的save,count,collect,etc)將使用這個池的名稱,同時這個設置是針對每個線程的,從而使的每個線程能

夠代表同一個用戶運行多個作業,如果想清除線程與之關聯的池,只需要調用:

sc.setLocalProperty("spark.scheduler.pool", null)

池的默認行為

默認情況下,每個池在集群中將獲得相同的份額(在默認池中共享每個作業),但是在每個池中,作業以FIFO順序運行,比如:如果你為用戶創建⼀、一個池,這意味著每個用戶將獲得集群的相同份額,並且每個用戶的查詢將以順序運行,而不是從該⽤、用戶的早期查詢結果中獲取。

10 配置池的屬性

特定池的屬性也可以通過配置文件進行修改,每個池支持三個屬性:

調度模式:可以是FIFO或者FAIR,來控制池中的作業是順序執行(默認情況下)還是共享池中的資源執行

權重:用來控制池相對其它池的共享份額,默認情況下,池的權重為1,比如,如果給一個特定的池權重為2,那麼它將獲得比其它池多兩倍的資源,設置一個⾼高權重(1000)也可能使的池在本質上

獲取優先權,權重位1000的池在得到job之後總能夠首先執行(譯註:如果一個池的權重設置的很高,那麼不管在什麼情況下,這個池的任務總是排在第一位執行)

minShare:除了整體的權重,每個池都可以得到一個管理員。希望它擁有的最小份額(作為CPU核⼼心數),在重新分配額外資源之前,公平調度程序會嘗試滿足所有活動的所需最小份額,因此

MinShare屬性可以是另一種方法,以確保能夠在不使集群的處於高度優先級的情況下,快速地獲取一定數量的資源(⽐比如:10核),默認情況下,每個池的minshare為0

可以通過創建⼀一個類似於conf/fairscheduler.xml.template 的xml⽂文件來設置池的屬性,並在sparkConf屬性中設置spark.scheduler.allocation

conf.set("spark.scheduler.allocation.file", "/path/to/file”)

XML的文件格式只是簡單的元素,裡面包含各種設置,比如:

FAIR

1

2

FIFO

2

3

一個完整的例子,可以在conf/fairscheduler.xml.template中查看,注意,在XML⽂文件中沒有配置的任何池只會得到所有設置的默認值(調度模式FIFO、權重1和minShare 0)。


分享到:


相關文章: