劉彬同學準備寫一系列spark實戰系列,本文是第一篇,SparkContext初始化內部原理!贊!推薦給大家, 希望大家喜歡和支持!
如果編寫Spark程序,那麼第⼀⾏代碼就是new SparkContext().setMaster(“”).setAppName(“xx”),可以說SparkContext是整個Spark 計算的啟動器,只有將sparkContext 啟動起來,後續的關於調 度,存儲,計算相關的操作才能夠運⾏,本⽂基於spark2.x源碼概述關於SparkContext⾥⾯所包含的啟動項都有哪些以及這些啟動項的作⽤是什麼,之後在說⼀下關於SparkEnv環境創建的過程。
閱讀本⽂最好打開spark源碼參考著看,可以在git上⾯打開spark關於sparkContext的代碼,地址為:https://github.com/apache/spark/blob/c5f9b89dda40ffaa4622a7ba2b3d0605dbe815c0/core/src/main/scala/org/apache/spark/SparkContext.scala#L73
01 SparkContext內部組件:
如圖為SparkContext內部的⼀些組件:
SparkEnv :Spark運⾏時環境,Spark 中任務執⾏是通過Executor,所有的Executor都有⾃⼰的執⾏環境SparkEnv,在Driver中也包含了SparkEnv,為了保證Local模式的運⾏,SparkEnv內部還提 供了不同的組件,來實現不同的功能
LiveListenerBus:SparkContext中的事件總線,可以接收各個使⽤者的事件,異步將SparkListenerevent傳遞給註冊的SparkListener
Spark UI :Spark的⽤戶界⾯,SparkUI間接依賴於計算引擎,調度引擎,存儲引擎,Job,Stage,Executor等組件的監控都會以SparkListenerEvent的形式傳遞給LiveListenerBus,SparkUI將從各 個SparkListener中讀取數據並顯⽰在web界⾯
SparkStatusTracker:⽤於監控作業和Stage進度狀態的低級API
ConsoleProgressBar :定期從sc.statusTracker獲得active stage的狀態信息,展⽰到進度條[在SparkUI上⾯可以看到進度條],會有⼀定的延時。內部有⼀個timer 500ms refresh⼀遍
DAGScheduler:DAG調度器,是Spark調度系統中重要的組件之⼀,負責創建Job,將DAG的RDD劃分到不同的Stage,提交stage等,SparkUI中有關Job和Stage監控數據都來⾃DAGScheduer
TaskScheduler:Task調度器,是Spark調度系統中重要的組件之⼀,負責將任務發送到集群,運⾏,如果有失敗的任務則重新執⾏,之後返回給DAGScheduler,TaskScheduler調度的Task是由 DAGScheduler創建的,所以DAGScheduler是TaskScheduler前置調度。
HeatbeatReceiver:⼼跳接收器,所有的Executor都會向HeatbeatReceiver發送⼼跳信息,HeatbeatReceiver接收到⼼跳之後,先更新Executor最後可⻅時間,然後將此信息交給TaskScheduler。
ContextCleaner:異步清理RDD、shuffle和⼲播狀態信息
EventLoggingListener:將事件持久化到存儲的監聽器,是SparkContext的可選組件,當spark.eventLog.enable
ExecutorAllocationManager: Executor動態分配管理器,根據⼯作負載動態調整Executor數量,當在配置spark.dynamicAlloction.enabled屬性為true的情況下,在⾮local模式下或者 spark.dynamicAllcation.testing屬性為true時啟⽤
ShutdownHookManager:設置關閉鉤⼦的管理器,可以給應⽤設置鉤⼦,這樣當JVM退出的時候就會執⾏清理⼯作
除了以上這些SparkContext包含的內部組件,還包括如下⼀些屬性:
creationSite:CallSite類型,保存著線程棧中最靠近棧頂的⽤戶定義的類和最靠近棧底的Scala或者Spark核⼼類的信息,其中ShortForm屬性保存著上述信息的間斷描述,LongForm屬性保存著上述 信息的完整描述,具體的信息可以參閱源碼部分地址為:core/src/main/scala/org/apache/spark/util/Utils.scala/getCallSite
allowMulitipleContext : 是否允許多個SparkContext實例,默認為False,可以通過設置Spark.Driver.allowMulitipleContexts來控制
startTime:標記sparkContext的啟動時間戳
stopped:標記sparkContext是否處於停⽌狀態,採⽤原⼦類型AtomicBoolean
addedFiles:⽤於每個本地⽂件的URL與添加此⽂件到到addedFiles時的時間戳之間的映射緩存 new ConcurrentHashMap[String, Long]
addedJars:⽤於每個本地Jar⽂件的URL與添加此⽂件到addedJars時的時間戳之間的映射緩存 new ConcurrentHashMap[String, Long]
persistentRdds:⽤於對所有持久化的RDD保持跟蹤
-
executorEnvs:⽤於存儲環境變量,將⽤於Executor執⾏的時候使⽤
sparkUser:當前系統的登錄⽤戶,可以通過環境變量SPARK_USER來設置 通過Utils.getCurrentUserName()獲取
checkpointDir:RDD計算過程中⽤於記錄RDD檢查點的⺫錄
localProperties:InheritableThreadLocal保護的線程,其中的屬性值可以沿著線程棧⼀直傳遞下去
_conf:SparkContext的配置,會先調⽤config的clone⽅法,在進⾏驗證配置,是否設置了spark.master和spark.app.name
jars:⽤戶提交的jar⽂件,當選擇部署模式為yarn時,
_jars是由spark.jars屬性指定的jar⽂件和spark.yarn.dist.jars屬性指定的並集 _files:⽤戶設置的⽂件,可以根據Spark.file屬性指定
_eventLogDir:事件⽇志的路徑,當spark.enabled屬性為true時啟⽤,默認為/tmp/spark-events,也可以通過spark.eventLog.dir來指定⺫錄 _eventLogCoder:事件⽇志的壓縮算法,當spark.eventLog.enabled屬性與spark.eventLog.compress屬性為true時,壓縮算法默認為lz4,也可以通過spark.io.compression.codec屬性指定,⺫前⽀持lzf,snappy和lz4
_hadoopConfiguration:Hadoop配置信息,如果系統屬性SPARK_YARN_MODE為true或者環境變量SPARK_YARN_MODEL為true,那麼將會是YARN的配置,否則為Hadoop的配置
_executorMemtory:Executor內存⼤⼩,默認為1024MB,可以通過設置環境變量(SPARK_MEM或者SPARK_EXECUTOR_MEMORY)或者Spark.executor.memory屬性指定其中Spark.executor.memory優先級最⾼
_applicationId:當前應⽤的標識,TaskScheduler啟動後會創建應⽤標識,通過調取TaskScheduler的ApplicationId獲取的
_applicationAttempId:當前應⽤嘗試執⾏的標識,SparkDriver在執⾏時會多次嘗試,每次嘗試都會⽣成⼀個標識來代表應⽤嘗試執⾏的⾝份
_listenerBusStarted:LiveListenerBus是否已經啟動的標記
nextShuffleId:類型為AtomicInteger,⽤於⽣成下⼀個shuffle標識
nextRddId:類型為atomicInteger,⽤於⽣成下⼀個rdd標識
02 初始化具體流程
創建SparkEnv
在Spark中,需要執⾏任務的地⽅就需要SparkEnv,在⽣產環境中,Spark往往運⾏於不同節點的Execute中,SparkEnv中的createDriverEnv⽤於創建SparkEnv,之後sparkEnv的實例通過set 設置到SparkEnv伴⽣對象env屬性中,然後在需要⽤到sparkEnv的地⽅直接通過伴⽣對象get獲取SparkEnv
創建⼼跳接受器(HeatbeatReceiver)
在Sparklocal運⾏模式中,driver和executor在同⼀個節點同⼀個進程中,所以driver和executor可以本地交互調⽤,但是在分佈式的環境中,driver和executor往往運⾏在不同的節點不同 的進程中,driver就⽆法監控executor的信息了,所以driver端創建了⼼跳接收器,那麼⼼跳接收器是如何創建的。 ⾸先通過SparkEnv的NettyRpcEnv(基於NettyRPC)的setupEndPoint⽅法,然後向Dispatcher註冊HeartbeatReceiver,並返回HeartbeatReceiver的NettyRpcEndPointRef的引⽤
.創建和啟動調度系統
Spark調度系統主要分為TaskScheduler和DAGScheduler,TaskScheduler負責請求集群管理器給應⽤程序分配並運⾏Executor並給Task分配Executor並執⾏,DAGScheduler主要⽤於在任務交 給TaskSchduler執⾏之前做⼀些準備⼯作,⽐如創建Job,將DAG的RDD劃分到不同的Stage,提交Stage等,如代碼:
val (sched, ts) = SparkContext. createTaskScheduler( this, master, deployMode)
_schedulerBackend = sched
_taskScheduler = ts
_dagScheduler = new DAGScheduler( this)
_heartbeatReceiver. ask[ Boolean]( TaskSchedulerIsSet)
SparkContext.createTaskScheduler⽅法⽤於創建和啟動TaskScheduler,針對不同的部署模式創建調度器的⽅式也不同,在代碼中,_schedulerBackend表⽰SchedulerBackend的引⽤, _taskScheduler表⽰TaskScheduler的引⽤,在TaskScheduler中還會創建DAGScheduler的實例,最後向_heartbeatReceiver發送TaskSchedulerSet的消息,HeartbeatReceiver接收到之後將獲取 SparkContext的_taskScheduler屬性設置到⾃⼰的Schduler屬性中
創建Executor動態分配管理器
ExecutorAllocationManager: Executor動態分配管理器,根據⼯作負載動態調整Executor數量,當在配置spark.dynamicAlloction.enabled屬性為true的情況下,在⾮local模式下或者 spark.dynamicAllcation.testing屬性為true時啟⽤
ExecutorAllocationManager內部會定期的根據負載計算所需的Executor數量,如果Executor需求數量⼤於之前向集群管理器申請的數量,那麼向集群管理器申請添加executor數量,反之,如果 executor需求數量⼩於之前向集群管理器申請的數量,那麼向集群管理器申請減少executor。此外,ExecutorAllocationManager還會定期向集群管理器申請移除已經過期的executor
創建和啟動ContextCleaner
ContextCleaner:異步清理RDD、shuffle和⼲播狀態信息
通過配置spark.cleaner.referenceTracking(默認為true)來決定是否啟⽤ContextCleaner
ContextCleaner的組成:
referencesQueue:緩存頂級的AnyRef引⽤
referencesBuffer:緩存AnyRef的虛引⽤
listeners:緩存清理⼯作中的監聽器數組
cleaningThread:清理具體⼯作的線程,此線程為守護線程
periodicGCService:⽤於執⾏GC的調度線程池
periodicGCInterval:執⾏GC的時間間隔,可通過spark.cleaner.periodicGC.interval配置,默認30分鐘
blockOnCleanUpTasks:清理⾮shuffle數據是否是阻塞的,可通過配置spark.cleaner.referenceTracking.blocking配置,默認是true
blockOnShuffleCleanUpTasks:清理shuffle數據是否是阻塞的,可通過配置spark.cleaner.referenceTracking.blocking.shuffle ,默認是false
stoped:標記contextCleaner是否停⽌狀態
以上可以在github上打開spark源碼進⾏邊看⽂章邊看源碼,你會受益良多。 在這⾥推薦⼀個github源碼閱讀插件Insight.io for Github 在chrome擴展程序裡可以直接查詢。
閱讀更多 大數據和雲計算技術 的文章