SparkContext 初始化內部原理

劉彬同學準備寫一系列spark實戰系列,本文是第一篇,SparkContext初始化內部原理!贊!推薦給大家,

希望大家喜歡和支持!

如果編寫Spark程序,那麼第⼀⾏代碼就是new SparkContext().setMaster(“”).setAppName(“xx”),可以說SparkContext是整個Spark 計算的啟動器,只有將sparkContext 啟動起來,後續的關於調 度,存儲,計算相關的操作才能夠運⾏,本⽂基於spark2.x源碼概述關於SparkContext⾥⾯所包含的啟動項都有哪些以及這些啟動項的作⽤是什麼,之後在說⼀下關於SparkEnv環境創建的過程。

閱讀本⽂最好打開spark源碼參考著看,可以在git上⾯打開spark關於sparkContext的代碼,地址為:https://github.com/apache/spark/blob/c5f9b89dda40ffaa4622a7ba2b3d0605dbe815c0/core/src/main/scala/org/apache/spark/SparkContext.scala#L73

01 SparkContext內部組件:

如圖為SparkContext內部的⼀些組件:

SparkContext 初始化內部原理

  • SparkEnv :Spark運⾏時環境,Spark 中任務執⾏是通過Executor,所有的Executor都有⾃⼰的執⾏環境SparkEnv,在Driver中也包含了SparkEnv,為了保證Local模式的運⾏,SparkEnv內部還提 供了不同的組件,來實現不同的功能

  • LiveListenerBus:SparkContext中的事件總線,可以接收各個使⽤者的事件,異步將SparkListenerevent傳遞給註冊的SparkListener

  • Spark UI :Spark的⽤戶界⾯,SparkUI間接依賴於計算引擎,調度引擎,存儲引擎,Job,Stage,Executor等組件的監控都會以SparkListenerEvent的形式傳遞給LiveListenerBus,SparkUI將從各 個SparkListener中讀取數據並顯⽰在web界⾯

  • SparkStatusTracker:⽤於監控作業和Stage進度狀態的低級API

  • ConsoleProgressBar :定期從sc.statusTracker獲得active stage的狀態信息,展⽰到進度條[在SparkUI上⾯可以看到進度條],會有⼀定的延時。內部有⼀個timer 500ms refresh⼀遍

  • DAGScheduler:DAG調度器,是Spark調度系統中重要的組件之⼀,負責創建Job,將DAG的RDD劃分到不同的Stage,提交stage等,SparkUI中有關Job和Stage監控數據都來⾃DAGScheduer

  • TaskScheduler:Task調度器,是Spark調度系統中重要的組件之⼀,負責將任務發送到集群,運⾏,如果有失敗的任務則重新執⾏,之後返回給DAGScheduler,TaskScheduler調度的Task是由 DAGScheduler創建的,所以DAGScheduler是TaskScheduler前置調度。

  • HeatbeatReceiver:⼼跳接收器,所有的Executor都會向HeatbeatReceiver發送⼼跳信息,HeatbeatReceiver接收到⼼跳之後,先更新Executor最後可⻅時間,然後將此信息交給TaskScheduler。

  • ContextCleaner:異步清理RDD、shuffle和⼲播狀態信息

  • EventLoggingListener:將事件持久化到存儲的監聽器,是SparkContext的可選組件,當spark.eventLog.enable

  • ExecutorAllocationManager: Executor動態分配管理器,根據⼯作負載動態調整Executor數量,當在配置spark.dynamicAlloction.enabled屬性為true的情況下,在⾮local模式下或者 spark.dynamicAllcation.testing屬性為true時啟⽤

  • ShutdownHookManager:設置關閉鉤⼦的管理器,可以給應⽤設置鉤⼦,這樣當JVM退出的時候就會執⾏清理⼯作

除了以上這些SparkContext包含的內部組件,還包括如下⼀些屬性:

  • creationSite:CallSite類型,保存著線程棧中最靠近棧頂的⽤戶定義的類和最靠近棧底的Scala或者Spark核⼼類的信息,其中ShortForm屬性保存著上述信息的間斷描述,LongForm屬性保存著上述 信息的完整描述,具體的信息可以參閱源碼部分地址為:core/src/main/scala/org/apache/spark/util/Utils.scala/getCallSite

  • allowMulitipleContext : 是否允許多個SparkContext實例,默認為False,可以通過設置Spark.Driver.allowMulitipleContexts來控制

  • startTime:標記sparkContext的啟動時間戳

  • stopped:標記sparkContext是否處於停⽌狀態,採⽤原⼦類型AtomicBoolean

  • addedFiles:⽤於每個本地⽂件的URL與添加此⽂件到到addedFiles時的時間戳之間的映射緩存 new ConcurrentHashMap[String, Long]

  • addedJars:⽤於每個本地Jar⽂件的URL與添加此⽂件到addedJars時的時間戳之間的映射緩存 new ConcurrentHashMap[String, Long]

  • persistentRdds:⽤於對所有持久化的RDD保持跟蹤

  • executorEnvs:⽤於存儲環境變量,將⽤於Executor執⾏的時候使⽤

  • sparkUser:當前系統的登錄⽤戶,可以通過環境變量SPARK_USER來設置 通過Utils.getCurrentUserName()獲取

  • checkpointDir:RDD計算過程中⽤於記錄RDD檢查點的⺫錄

  • localProperties:InheritableThreadLocal保護的線程,其中的屬性值可以沿著線程棧⼀直傳遞下去

  • _conf:SparkContext的配置,會先調⽤config的clone⽅法,在進⾏驗證配置,是否設置了spark.master和spark.app.name

  • jars:⽤戶提交的jar⽂件,當選擇部署模式為yarn時,

_jars是由spark.jars屬性指定的jar⽂件和spark.yarn.dist.jars屬性指定的並集 _files:⽤戶設置的⽂件,可以根據Spark.file屬性指定

_eventLogDir:事件⽇志的路徑,當spark.enabled屬性為true時啟⽤,默認為/tmp/spark-events,也可以通過spark.eventLog.dir來指定⺫錄 _eventLogCoder:事件⽇志的壓縮算法,當spark.eventLog.enabled屬性與spark.eventLog.compress屬性為true時,壓縮算法默認為lz4,也可以通過spark.io.compression.codec屬性指定,⺫前⽀持lzf,snappy和lz4

  • _hadoopConfiguration:Hadoop配置信息,如果系統屬性SPARK_YARN_MODE為true或者環境變量SPARK_YARN_MODEL為true,那麼將會是YARN的配置,否則為Hadoop的配置

  • _executorMemtory:Executor內存⼤⼩,默認為1024MB,可以通過設置環境變量(SPARK_MEM或者SPARK_EXECUTOR_MEMORY)或者Spark.executor.memory屬性指定其中Spark.executor.memory優先級最⾼

  • _applicationId:當前應⽤的標識,TaskScheduler啟動後會創建應⽤標識,通過調取TaskScheduler的ApplicationId獲取的

  • _applicationAttempId:當前應⽤嘗試執⾏的標識,SparkDriver在執⾏時會多次嘗試,每次嘗試都會⽣成⼀個標識來代表應⽤嘗試執⾏的⾝份

  • _listenerBusStarted:LiveListenerBus是否已經啟動的標記

  • nextShuffleId:類型為AtomicInteger,⽤於⽣成下⼀個shuffle標識

  • nextRddId:類型為atomicInteger,⽤於⽣成下⼀個rdd標識

02 初始化具體流程

  1. 創建SparkEnv

    在Spark中,需要執⾏任務的地⽅就需要SparkEnv,在⽣產環境中,Spark往往運⾏於不同節點的Execute中,SparkEnv中的createDriverEnv⽤於創建SparkEnv,之後sparkEnv的實例通過set 設置到SparkEnv伴⽣對象env屬性中,然後在需要⽤到sparkEnv的地⽅直接通過伴⽣對象get獲取SparkEnv

  2. 創建⼼跳接受器(HeatbeatReceiver)

    在Sparklocal運⾏模式中,driver和executor在同⼀個節點同⼀個進程中,所以driver和executor可以本地交互調⽤,但是在分佈式的環境中,driver和executor往往運⾏在不同的節點不同 的進程中,driver就⽆法監控executor的信息了,所以driver端創建了⼼跳接收器,那麼⼼跳接收器是如何創建的。 ⾸先通過SparkEnv的NettyRpcEnv(基於NettyRPC)的setupEndPoint⽅法,然後向Dispatcher註冊HeartbeatReceiver,並返回HeartbeatReceiver的NettyRpcEndPointRef的引⽤

  3. .創建和啟動調度系統

    Spark調度系統主要分為TaskScheduler和DAGScheduler,TaskScheduler負責請求集群管理器給應⽤程序分配並運⾏Executor並給Task分配Executor並執⾏,DAGScheduler主要⽤於在任務交 給TaskSchduler執⾏之前做⼀些準備⼯作,⽐如創建Job,將DAG的RDD劃分到不同的Stage,提交Stage等,如代碼:

    val (sched, ts) = SparkContext. createTaskScheduler( this, master, deployMode)

    _schedulerBackend = sched

    _taskScheduler = ts

    _dagScheduler = new DAGScheduler( this)

    _heartbeatReceiver. ask[ Boolean]( TaskSchedulerIsSet)

    SparkContext.createTaskScheduler⽅法⽤於創建和啟動TaskScheduler,針對不同的部署模式創建調度器的⽅式也不同,在代碼中,_schedulerBackend表⽰SchedulerBackend的引⽤, _taskScheduler表⽰TaskScheduler的引⽤,在TaskScheduler中還會創建DAGScheduler的實例,最後向_heartbeatReceiver發送TaskSchedulerSet的消息,HeartbeatReceiver接收到之後將獲取 SparkContext的_taskScheduler屬性設置到⾃⼰的Schduler屬性中

  4. 創建Executor動態分配管理器

    ExecutorAllocationManager: Executor動態分配管理器,根據⼯作負載動態調整Executor數量,當在配置spark.dynamicAlloction.enabled屬性為true的情況下,在⾮local模式下或者 spark.dynamicAllcation.testing屬性為true時啟⽤

    ExecutorAllocationManager內部會定期的根據負載計算所需的Executor數量,如果Executor需求數量⼤於之前向集群管理器申請的數量,那麼向集群管理器申請添加executor數量,反之,如果 executor需求數量⼩於之前向集群管理器申請的數量,那麼向集群管理器申請減少executor。此外,ExecutorAllocationManager還會定期向集群管理器申請移除已經過期的executor

  5. 創建和啟動ContextCleaner

    ContextCleaner:異步清理RDD、shuffle和⼲播狀態信息

    通過配置spark.cleaner.referenceTracking(默認為true)來決定是否啟⽤ContextCleaner

    ContextCleaner的組成:

    referencesQueue:緩存頂級的AnyRef引⽤

    referencesBuffer:緩存AnyRef的虛引⽤

    listeners:緩存清理⼯作中的監聽器數組

    cleaningThread:清理具體⼯作的線程,此線程為守護線程

    periodicGCService:⽤於執⾏GC的調度線程池

    periodicGCInterval:執⾏GC的時間間隔,可通過spark.cleaner.periodicGC.interval配置,默認30分鐘

    blockOnCleanUpTasks:清理⾮shuffle數據是否是阻塞的,可通過配置spark.cleaner.referenceTracking.blocking配置,默認是true

    blockOnShuffleCleanUpTasks:清理shuffle數據是否是阻塞的,可通過配置spark.cleaner.referenceTracking.blocking.shuffle ,默認是false

    stoped:標記contextCleaner是否停⽌狀態

以上可以在github上打開spark源碼進⾏邊看⽂章邊看源碼,你會受益良多。 在這⾥推薦⼀個github源碼閱讀插件Insight.io for Github 在chrome擴展程序裡可以直接查詢。


分享到:


相關文章: