DAGScheduler-作業提交

作業提交

以count算子為例,count算子是一個Action,會觸發Job的運行。count算子源碼,位於org.apache.spark.rdd.RDD#count

在RDD的源碼中,count方法出發了SparkContext的runJob方法來提交作業,這個提交在內部隱性調用runJob方法進行的,對用戶來說不用顯式地取提交作業。

對於RDD來說,會根據批次之間的依賴關係形成一個有向無環圖(DAG),然後把這個圖交給DAGSchedler處理。從源碼來看,SparkContext的runJob方法經過幾次調用後,進入DAGScheduler的runJob方法,位於org.apache.spark.SparkContext#runJob

在DAGScheduler的runJob方法中,調用submitJob方法繼續提交作業,這裡會發生阻塞,知道作業完成或失敗的結果;在submitJob方法裡,創建一個JobWriter對象,並藉助內部消息處理把這個對象發送給DAGScheduler的內嵌類DAGSchedulerEventProcessLoop進行處理;最後在DAGSchedulerEventProcessLoop消息接收onReceive方法中,接收到JobSubmitted樣例類完成匹配後,繼續調用DAGScheduler的handleJobSubmitted方法提交作業,在該方法中進行劃分Stage。

DAGScheduler的runJob方法,調用submitJob方法繼續提交作業,返回JobWriter對象,並等待任務處理成功或者失敗。位於org.apache.spark.scheduler.DAGScheduler#runJob

JobSubmitted

DAGScheduler的submitJob方法中,submitJob首先獲取rdd.partitions.length,校驗運行的時候partitions是否存在。submitJob的關鍵代碼是向DAGSchedulerEventProcessLoop發送JobSubmitted消息,JobSubmitted是一個case class,而不是一個case object。JobSubmitted的成員finalRDD是最後一個RDD。創建JobWaiter對象,對返回。

DAGScheduler的submitJob方法源碼,位於org.apache.spark.scheduler.DAGScheduler#submitJob

由Action導致SparkContext.runJob的執行,最終導致DAGScheduler中的submitJob的執行,其核心是通過發送case class JobSubmitted對象給DAGSchedulerEventProcessLoop。

JobSubmitted 源碼,位於org.apache.spark.scheduler.JobSubmitted

JobSubmitted是private[scheduler]級別的,用戶不能直接調用。JobSubmitted封裝了jobId、最後一個finalRDD、具體對RDD操作的函數func、哪些partitions要進行計算、作業監聽器、狀態等內容。

DAGSchedulerEventProcessLoop

DAGSchedulerEventProcessLoop繼承於EventLoop

EventLoop中開啟一個線程eventThread,線程設置成Daemon後臺運行的方式;run方法中調用onReceive(event)方法。其中post方法是向事件隊列eventQueue中放入一個元素。

EventLoop源碼,位於org.apache.spark.util.EventLoop

DAGSchedulerEventProcessLoop接收到JobSubmitted消息後,調用DAGScheduler的handleJobSubmitted方法提交作業,進行階段劃分。

DAGSchedulerEventProcessLoop源碼,位於org.apache.spark.scheduler.DAGSchedulerEventProcessLoop

EventLoop裡面開啟一個線程,線程裡面不斷循環一個隊列,post的時候就是將消息放到隊列中,由於消息放到消息隊列中,在不斷循環,所以可以拿到這個消息,轉過來調用onReceive(event),在onReceive處理的時候就調用了doOnReceive方法。


分享到:


相關文章: