深入淺出理解 Spark:環境部署與工作原理


一、Spark 概述

Spark 是 UC Berkeley AMP Lab 開源的通用分佈式並行計算框架,目前已成為 Apache 軟件基金會的頂級開源項目。Spark 支持多種編程語言,包括 Java、Python、R 和 Scala,同時 Spark 也支持 Hadoop 的底層存儲系統 HDFS,但 Spark 不依賴 Hadoop。

1.1 Spark 與 Hadoop

Spark 基於 Hadoop MapReduce 算法實現的分佈式計算,擁有 Hadoop MapReduce 所具有的優點,並且具有更高的運算速度。Spark 能夠比 Hadoop 運算更快,主要原因是:Hadoop 在一次 MapReduce 運算之後,會將數據的運算結果從內存寫入到磁盤中,第二次 MapReduce 運算時在從磁盤中讀取數據,兩次對磁盤的操作,增加了多餘的 IO 消耗;而 Spark 則是將數據一直緩存在內存中,運算時直接從內存讀取數據,只有在必要時,才將部分數據寫入到磁盤中。除此之外,Spark 使用最先進的 DAG(Directed Acyclic Graph,有向無環圖)調度程序、查詢優化器和物理執行引擎,在處理批量處理以及處理流數據時具有較高的性能。按照Spark 官網的說法,Spark 相對於 Hadoop 而言,Spark 能夠達到 100 倍以上的運行負載。

深入淺出理解 Spark:環境部署與工作原理

圖1 Spark與Hadoop計算性能比較


(圖片來源:Apache Spark™)

1.2 Spark 架構及生態

Spark 除了 Spark Core 外,還有其它由多個組件組成,目前主要有四個組件:Spark SQL、Spark Streaming、MLlib、GraphX。這四個組件加上 Spark Core 組成了 Spark 的生態。通常,我們在編寫一個 Spark 應用程序,需要用到 Spark
Core 和其餘 4 個組件中的至少一個。Spark 的整體構架圖如下圖所示:

深入淺出理解 Spark:環境部署與工作原理

圖2 Spark構架圖


Spark Core:是 Spark 的核心,主要負責任務調度等管理功能。Spark
Core 的實現依賴於 RDDs(Resilient Distributed Datasets,
彈性分佈式數據集)的程序抽象概念。

Spark SQL:是 Spark 處理結構化數據的模塊,該模塊旨在將熟悉的 SQL 數據庫查詢與更復雜的基於算法的分析相結合,Spark
SQL 支持開源 Hive 項目及其類似 SQL 的 HiveQL 查詢語法。Spark
SQL 還支持 JDBC 和 ODBC 連接,能夠直接連接現有的數據庫。

Spark Streaming:這個模塊主要是對流數據的處理,支持流數據的可伸縮和容錯處理,可以與 Flume(針對數據日誌進行優化的一個系統)和 Kafka(針對分佈式消息傳遞進行優化的流處理平臺)等已建立的數據源集成。Spark Streaming 的實現,也使用 RDD 抽象的概念,使得在為流數據(如批量歷史日誌數據)編寫應用程序時,能夠更靈活,也更容易實現。

MLlib:

主要用於機器學習領域,它實現了一系列常用的機器學習和統計算法,如分類、迴歸、聚類、主成分分析等算法。

GraphX:這個模塊主要支持數據圖的分析和計算,並支持圖形處理的 Pregel API 版本。GraphX 包含了許多被廣泛理解的圖形算法,如 PageRank。


1.3 Spark 運行模式

Spark 有多種運行模式,由圖 2 中,可以看到 Spark 支持本地運行模式(Local 模式)、獨立運行模式(Standalone 模式)、Mesos、YARN(Yet Another Resource Negotiator)、Kubernetes 模式等。

本地運行模式是 Spark 中最簡單的一種模式,也可稱作偽分佈式模式。

獨立運行模式為 Spark 自帶的一種集群管理模式,Mesos 及 YARN 兩種模式也是比較常用的集群管理模式。相比較 Mesos 及 YARN 兩種模式而言,獨立運行模式是最簡單,也最容易部署的一種集群運行模式。

Kubernetes 是一個用於自動化部署、擴展和管理容器化應用程序的開源系統。

Spark 底層還支持多種數據源,能夠從其它文件系統讀取數據,如 HDFS、Amazon S3、Hypertable、HBase 等。Spark 對這些文件系統的支持,同時也豐富了整個 Spark 生態的運行環境。


二、Spark 部署模式

Spark 支持多種分佈式部署模式,主要支持三種部署模式,分別是:StandaloneSpark on YARNSpark on Mesos模式。

Standalone模式為 Spark 自帶的一種集群管理模式,即獨立模式,自帶完整的服務,可單獨部署到一個集群中,無需依賴任何其他資源管理系統。它是 Spark 實現的資源調度框架,其主要的節點有 Driver 節點、Master 節點和 Worker 節點。Standalone模式也是最簡單最容易部署的一種模式。

Spark on YARN模式即 Spark 運行在Hadoop YARN框架之上的一種模式。Hadoop YARN(Yet Another Resource
Negotiator,另一種資源協調者)是一種新的 Hadoop 資源管理器,它是一個通用資源管理系統,可為上層應用提供統一的資源管理和調度。

Spark on Mesos模式,即 Spark 運行在Apache Mesos框架之上的一種模式。Apache Mesos是一個更強大的分佈式資源管理框架,負責集群資源的分配,它允許多種不同的框架部署在其上,包括YARN。它被稱為是分佈式系統的內核。

三種架構都採用了Master/Worker(Slave)的架構,Spark 分佈式運行架構大致如下:

深入淺出理解 Spark:環境部署與工作原理


本文主要介紹 Spark 的Standalone模式的部署。


三、環境準備

出於學習的目的,本文將 Spark 部署在安裝有 CentOS7 系統的 VirtualBox 虛擬機中。

搭建 Spark 集群,需要準備以下文件及環境:

jdk-8u211-linux-x64.tar.gz

spark-2.4.3-bin-hadoop2.7.tgz

3 個獨立的 CentOS7 虛擬機系統,機器集群規劃如下:

深入淺出理解 Spark:環境部署與工作原理

四、安裝

4.1. 配置 jdk 環境

解壓文件:

<code>tar-zxfjdk-8u211-linux-x64.tar.gz
/<code>

配置環境變量:

<code>exportJAVA_HOME=/path/to/jdk1.8.0_211
exportPATH=$PATH:$JAVA_HOME/bin
/<code>

4.2. 配置 Spark 環境

解壓文件:

<code>tar-xfspark-2.4.3-bin-hadoop2.7.tgz
/<code>

配置環境變量:

<code>exportSPARK_HOME=/path/to/spark-2.4.3-bin-hadoop2.7
exportPATH=$PATH:$SPARK_HOME/bin
/<code>

修改spark-env.sh 文件

<code>cdspark-2.4.3-bin-hadoop2.7
cpconf/spark-env.sh.templateconf/spark-env.sh
vimconf/spark-env.sh
#增加如下內容:
exportJAVA_HOME=/path/to/jdk1.8.0_211
exportSPARK_MASTER_HOST=192.168.56.106
/<code>

修改slaves文件

<code>cpconf/slaves.templateconf/slaves
vimconf/slaves
#增加如下內容:
192.168.56.106
192.168.56.107
192.168.56.108
/<code>

4.3. 配置 ssh 免密登錄

配置 ssh 免密登錄,是為了能夠在master機器上來啟動所有worker節點,如果不配置免密登錄,則在啟動每個worker時,都需要輸入一遍密碼,會很麻煩。當然,如果機器少的話,也可以登錄到worker節點上,手動一個一個啟動worker

執行:ssh-keygen -t rsa,一直按回車即可。最後會生成類似這樣的日誌:

深入淺出理解 Spark:環境部署與工作原理

並且在用戶目錄下會自動生成.ssh目錄執行ls ~/.ssh可以看到兩個文件:

id_rsa 生成的私鑰文件

id_rsa.pub 生成的公鑰文件

id_rsa.pub複製到其它機器上,執行以下幾條命令:

<code>ssh-copy-id-i~/.ssh/[email protected]:# master所在的主機,如果master不做woker可以不需要。
ssh-copy-id-i~/.ssh/[email protected]:
ssh-copy-id-i~/.ssh/[email protected]:
/<code>

4.4 配置其它 worker 節點

當前已在master節點配置好了環境,還需要在其它worker節點上配置相類似的環境。

配置其它worker節點很簡單,只需要將jdk1.8.0_211spark-2.4.3-bin-hadoop2.7兩個目錄複製到其它

worker節點機器上即可。但要注意,這兩個目錄在其它 worker 上的絕對路徑需要與 master 上的絕對路徑一致,不然無法直接在 master 上啟動其它 worker 節點。

依次執行以下命令(如果已經配置好 ssh 免密,可以發現執行 scp 指令不需要兩次輸入密碼):

<code>scp-r/path/to/[email protected]:/path/to/jdk1.8.0_211
scp-r/path/to/[email protected]:/path/to/jdk1.8.0_211
scp-r/path/to/[email protected]:/path/to/spark-2.4.3-bin-hadoop2.7
scp-r/path/to/[email protected]:/path/to/spark-2.4.3-bin-hadoop2.7
/<code>

4.5 啟動 master

執行:

<code>sbin/start-master.sh
/<code>

輸入jps指令(該指令在$JAVA_HOME/bin 目錄下)可以查看 java 進程名,如輸入jps後,會顯示這樣的信息:

看到有Master字樣,說明master

進程啟動成功了,啟動master後,spark 默認會監聽8080端口,並可以通過瀏覽器打開 web 界面,在地址欄輸入http://192.168.56.106:8080,查看集群狀態。如下圖所示:

深入淺出理解 Spark:環境部署與工作原理

當前只啟動了master,所以看不到任何worker信息。

4.6 啟動 worker 節點

執行:

<code>sbin/slaves.sh
/<code>

會看到類似這樣的輸出:

深入淺出理解 Spark:環境部署與工作原理

再輸入jps,會列出當前啟動的java進程,顯示

Worker字樣,說明worker進程啟動成功了。

深入淺出理解 Spark:環境部署與工作原理

此時再刷新下打開的瀏覽器界面(http://192.168.56.106:8080),可以看到當前啟動了三個Worker節點。

深入淺出理解 Spark:環境部署與工作原理

也許你會發現界面上顯示的 Address 列,怎麼是 10 開頭的 ip 地址,並且都是一樣的,而不是 192 開頭的三個不同的 ip 地址。

深入淺出理解 Spark:環境部署與工作原理

這是因為虛擬機內有兩塊虛擬網卡,Spark 會讀取環境變量SPARK_LOCAL_IP,如果沒設置這個變量,Spark 就會使用getHostByName來獲取 ip 地址,會得到10.0.2.15這個 ip 地址。

深入淺出理解 Spark:環境部署與工作原理

要解決這個問題,有兩種方法:

(1) 將僅主機(Host-Only)網絡設置為網卡 1,將網絡地址轉換(NAT)設置為網卡 2。不過如果使用這種方法,重啟虛擬機後,如果是動態 ip,則 ip 地址會變化,會影響之前的配置。


(2) 另一種方法,可在conf/spark-env.sh中設置SPARK_LOCAL_IP這個變量,可以固定為一個 ip 地址,

<code>vimconf/spark-env.sh
#添加一行:
exportSPARK_LOCAL_IP=192.168.56.106
/<code>

在其他機器上同樣需要手動添加這一行,不過要修改為對應的機器 ip。覺得這樣有點麻煩。可以通過腳本動態獲取本機 ip 地址,在conf/spark-env.sh中添加這兩行:

<code>SPARK_LOCAL_IP=`python-c"importsocket;importfcntl;importstruct;print([(socket.inet_ntoa(fcntl.ioctl(s.fileno(),0x8915,struct.pack('256s','enp0s8'))[20:24]),s.close())forsin[socket.socket(socket.AF_INET,socket.SOCK_DGRAM)]][0][0])"`
exportSPARK_LOCAL_IP
/<code>


這樣就可以自動獲取本機的enp0s8這塊網卡的 ip 地址。

最後將修改後的conf/spark-env.sh這個文件複製到其它機器上:

執行:

<code>scpconf/[email protected]:/path/to/spark-2.4.3-bin-hadoop2.7/conf/spark-env.sh
scpconf/[email protected]:/path/to/spark-2.4.3-bin-hadoop2.7/conf/spark-env.sh
/<code>

重新啟動所有節點:

<code>sbin/stop-all.sh

sbin/start-all.sh
/<code>

最後刷新瀏覽器界面,可以看到有 3 個Woker啟動了,並且在 Address 列也可以看到都變為 192 開頭的 ip 地址了。

五、測試

{SPARK_HOME}/examples/src/main目錄下,有一些 spark 自帶的示例程序,有 java、python、r、scala 四種語言版本的程序。這裡主要測試 python 版的計算PI的程序。

<code>cd${SPARK_HOME}/examples/src/main/python
/<code>

pi.py程序提交到 spark 集群,執行:


<code>spark-submit --master spark://192.168.56.106:7077 pi.py/<code>

最後可以看到輸出這樣的日誌:

深入淺出理解 Spark:環境部署與工作原理

刷新瀏覽器界面,在Completed Applications欄可以看到一條記錄,即剛才執行的計算PI的 python 程序。

深入淺出理解 Spark:環境部署與工作原理

另外,如果覺得在終端中輸出的日誌太多,可以修改日誌級別:

<code>cp${SPARK_HOME}/conf/log4j.properties.template${SPARK_HOME}/conf/log4j.properties
vim${SPARK_HOME}/conf/log4j.properties
/<code>

修改日誌級別為WARN

深入淺出理解 Spark:環境部署與工作原理

再重新執行:spark-submit --master spark://192.168.56.106:7077 pi.py,可以看到輸出日誌少了很多。

深入淺出理解 Spark:環境部署與工作原理

除了提交 python 程序外,spark-submit 還可以提交打包好的javascala程序,可以執行spark-submit --help看具體用法。


Spark 配置文件說明

在下載下來的spark-2.4.3-bin-hadoop2.7.tgz中,conf 目錄下會默認存在這幾個文件,均為 Spark 的配置示例模板文件:

深入淺出理解 Spark:環境部署與工作原理

這些模板文件,均不會被 Spark 讀取,需要將.template後綴去除,Spark 才會讀取這些文件。這些配置文件中,在 Spark 集群中主要需要關注的是log4j.propertiesslavesspark-defaults.confspark-env.sh這四個配置文件。

log4j.properties的配置,可以參考Apache Log4j官網上的 Propertities 屬性配置說明。

slaves的配置,裡面為集群的所有worker節點的主機信息,可以為主機名,也可以為 ip 地址。

spark-defaults.conf的配置,可以參考Spark 官網的屬性配置頁。比如指定 master 節點地址,可以設置spark.master屬性;指定 executor 的運行時的核數,可以設置spark.executor.cores屬性等。

spark-env.sh

是 Spark 運行時,會讀取的一些環境變量,在本文中,主要設置了三個環境變量:JAVA_HOMESPARK_HOMESPARK_LOCAL_IP,這是 Spark 集群搭建過程中主要需要設置的環境變量。其它未設置的環境變量,Spark 均採用默認值。其它環境變量的配置說明,可以參考Spark 官網的環境變量配置頁。

至此,Spark 集群的Standalone模式部署全部結束。

對於 Spark 的學習,目前我掌握還比較淺,還在學習過程中。如果文章中有描述不準確,或不清楚的地方,希望給予指正,我會及時修改。謝謝!

關於 Spark 的學習,可以根據 Spark 官網上的指導快速入門:

https://spark.apache.org/docs/latest/quick-start.html


六、 Spark 中的計算模型

6.1 Spark 中的幾個主要基本概念

在 Spark 中,有幾個基本概念是需要先了解的,瞭解這些基本概念,對於後續在學習和使用 Spark 過程中,能更容易理解一些。

Application:基於 Spark 的用戶程序,即由用戶編寫的調用 Spark API 的應用程序,它由集群上的一個驅動(Driver)程序和多個執行器(Executor)程序組成。其中應用程序的入口為用戶所定義的 main 方法。

SparkContext:是 Spark 所有功能的主要入口點,它是用戶邏輯與 Spark 集群主要的交互接口。通過SparkContext,可以連接到集群管理器(Cluster Manager),能夠直接與集群 Master 節點進行交互,並能夠向 Master 節點申請計算資源,也能夠將應用程序用到的 JAR 包或 Python 文件發送到多個執行器(Executor)節點上。

Cluster Manager:即集群管理器,它存在於 Master 進程中,主要用來對應用程序申請的資源進行管理。

Worker Node:任何能夠在集群中能夠運行 Spark 應用程序的節點。

Task:SparkContext

發送到Executor節點上執行的一個工作單元。

Driver:也即驅動器節點,它是一個運行Applicationmain()函數並創建SparkContext的進程。Driver節點也負責提交Job,並將Job轉化為Task,在各個Executor進程間協調 Task 的調度。Driver節點可以不運行於集群節點機器上。

Executor:也即執行器節點,它是在一個在工作節點(Worker Node)上為Application啟動的進程,它能夠運行 Task 並將數據保存在內存或磁盤存儲中,也能夠將結果數據返回給Driver

根據以上術語的描述,通過下圖可以大致看到 Spark 程序在運行時的內部協調過程:

深入淺出理解 Spark:環境部署與工作原理

圖3 Spark應用程序運行時的內部協調過程


(圖片來源:Cluster Mode Overview)

除了以上幾個基本概念外,Spark 中還有幾個比較重要的概念。


6.2 RDD

6.2.1 基本概念

即彈性分佈式數據集(Resilient Distributed Datasets),是一種容錯的、可以被並行操作的元素集合,它是 Spark 中最重要的一個概念,是 Spark 對所有數據處理的一種基本抽象。Spark 中的計算過程可以簡單抽象為對 RDD 的創建、轉換和返回操作結果的過程:

深入淺出理解 Spark:環境部署與工作原理

圖4 Spark的RDD計算抽象過程

對於 Spark 的 RDD 計算抽象過程描述如下:

makeRDD:可以通過訪問外部物理存儲(如 HDFS),通過調用 SparkContext.textFile()方法來讀取文件並創建一個 RDD,也可以對輸入數據集合通過調用 SparkContext.parallelize()方法來創建一個 RDD。RDD 被創建後不可被改變,只可以對 RDD 執行 Transformation 及 Action 操作。

Transformation(轉換):對已有的 RDD 中的數據執行計算進行轉換,併產生新的 RDD,在這個過程中有時會產生中間 RDD。Spark 對於Transformation採用惰性計算機制,即在 Transformation 過程並不會立即計算結果,而是在 Action 才會執行計算過程。如mapfiltergroupByKey、cache等方法,只執行Transformation操作,而不計算結果。

Action(執行):對已有的 RDD 中的數據執行計算產生結果,將結果返回 Driver 程序或寫入到外部物理存儲(如 HDFS)。如reducecollect

countsaveAsTextFile等方法,會對 RDD 中的數據執行計算。


6.2.2 RDD 依賴關係

Spark 中 RDD 的每一次Transformation都會生成一個新的 RDD,這樣 RDD 之間就會形成類似於流水線一樣的前後依賴關係,在 Spark 中,依賴關係被定義為兩種類型,分別是窄依賴和寬依賴:

窄依賴(NarrowDependency):每個父 RDD 的一個分區最多被子 RDD 的一個分區所使用,即 RDD 之間是一對一的關係。窄依賴的情況下,如果下一個 RDD 執行時,某個分區執行失敗(數據丟失),只需要重新執行父 RDD 的對應分區即可進行數恢復。例如mapfilterunion等算子都會產生窄依賴。

寬依賴(WideDependency,或 ShuffleDependency):是指一個父 RDD 的分區會被子 RDD 的多個分區所使用,即 RDD 之間是一對多的關係。當遇到寬依賴操作時,數據會產生

Shuffle,所以也稱之為ShuffleDependency。寬依賴情況下,如果下一個 RDD 執行時,某個分區執行失敗(數據丟失),則需要將父 RDD 的所有分區全部重新執行才能進行數據恢復。例如groupByKeyreduceByKeysortByKey等操作都會產生寬依賴。

RDD 依賴關係如下圖所示:

深入淺出理解 Spark:環境部署與工作原理

圖5 RDD依賴關係


6.3 Partition

6.3.1 基本概念

partition(分區)是 Spark 中的重要概念,它是RDD的最小單元,RDD是由分佈在各個節點上的partition 組成的。partition的數量決定了task的數量,每個task對應著一個partition

例如,使用 Spark 來讀取本地文本文件內容,讀取完後,這些內容將會被分成多個partition,這些partition就組成了一個RDD,同時這些partition可以分散到不同的機器上執行。RDD 的 partition 描述如下圖所示:

深入淺出理解 Spark:環境部署與工作原理

圖6 RDD partition描述

partition的數量可以在創建 RDD 時指定,如果未指定 RDD 的 partition 大小,則在創建 RDD 時,Spark 將使用默認值,默認值為spark.default.parallelism配置的參數。


6.3.2 Partition 數量影響及調整

Partition 數量的影響:

如果 partition 數量太少,則直接影響是計算資源不能被充分利用。例如分配 8 個核,但 partition 數量為 4,則將有一半的核沒有利用到。

如果 partition 數量太多,計算資源能夠充分利用,但會導致 task 數量過多,而 task 數量過多會影響執行效率,主要是 task 在序列化和網絡傳輸過程帶來較大的時間開銷。

根據Spark RDD Programming Guide上的建議,集群節點的每個核分配 2-4 個partitions比較合理。

深入淺出理解 Spark:環境部署與工作原理

Partition 調整:

Spark 中主要有兩種調整 partition 的方法:coalesce、repartition

參考 pyspark 中的函數定義:

<code>defcoalesce(self,numPartitions,shuffle=False):
"""
ReturnanewRDDthatisreducedinto\\`numPartitions\\`partitions.
"""
/<code>
<code>defrepartition(self,numPartitions):
"""
*ReturnanewRDDthathasexactlynumPartitionspartitions.*

*CanincreaseordecreasethelevelofparallelisminthisRDD.*
*Internally,thisusesashuffletoredistributedata.*
*IfyouaredecreasingthenumberofpartitionsinthisRDD,consider*
*using\\`coalesce\\`,whichcanavoidperformingashuffle.*
*"""
returnself.coalesce(numPartitions,shuffle=True)
/<code>

從函數接口可以看到,reparation是直接調用coalesce(numPartitions, shuffle=True),不同的是,reparation函數可以增加或減少 partition 數量,調用repartition函數時,還會產生shuffle操作。而coalesce函數可以控制是否shuffle

,但當shuffleFalse時,只能減小partition數,而無法增大。

6.4 Job

前面提到,RDD 支持兩種類型的算子操作:TransformationAction。Spark 採用惰性機制,Transformation算子的代碼不會被立即執行,只有當遇到第一個Action算子時,會生成一個Job,並執行前面的一系列Transformation操作。一個Job包含NTransformation和 1 個Action

而每個Job會分解成一系列可並行處理的Task,然後將Task分發到不同的

Executor上運行,這也是 Spark 分佈式執行的簡要流程。


6.5 Stage

Spark 在對Job中的所有操作劃分Stage時,一般會按照倒序進行,依據 RDD 之間的依賴關係(寬依賴或窄依賴)進行劃分。即從Action開始,當遇到窄依賴類型的操作時,則劃分到同一個執行階段;遇到寬依賴操作,則劃分一個新的執行階段,且新的階段為之前階段的Parent,之前的階段稱作Child Stage,然後依次類推遞歸執行。Child Stage需要等待所有的Parent Stage執行完之後才可以執行,這時Stage之間根據依賴關係構成了一個大粒度的 DAG。

如下圖所示,為一個複雜的 DAG Stage 劃分示意圖:

深入淺出理解 Spark:環境部署與工作原理

圖7 Spark中Stage劃分過程

上圖為一個 Job,該 Job 生成的 DAG 劃分成了 3 個 Stage。上圖的 Stage 劃分過程是這樣的:從最後的Action開始,從後往前推,當遇到操作為NarrowDependency時,則將該操作劃分為同一個Stage,當遇到操作為ShuffleDependency時,則將該操作劃分為新的一個Stage


6.6 Task

Task為一個Stage中的一個執行單元,也是 Spark 中的最小執行單元,一般來說,一個 RDD 有多少個Partition,就會有多少個Task,因為每一個Task 只是處理一個Partition上的數據。在一個Stage內,所有的 RDD 操作以串行的 Pipeline 方式,由一組併發的

Task完成計算,這些Task的執行邏輯完全相同,只是作用於不同的Partition。每個Stage裡面Task的數目由該Stage最後一個 RDD 的Partition 個數決定。

Spark 中Task分為兩種類型,ShuffleMapTask 和 ResultTask,位於最後一個 Stage 的 Task 為 ResultTask,其他階段的屬於 ShuffleMapTask。ShuffleMapTask 和 ResultTask 分別類似於 Hadoop 中的 Map 和 Reduce。


七、Spark 調度原理

7.1 Spark 集群整體運行架構

深入淺出理解 Spark:環境部署與工作原理

圖8 Spark集群整體運行架構


(圖片來源:SparkInternals-Overview

Spark 集群分為 Master 節點和 Worker 節點,相當於 Hadoop 的 Master 和 Slave 節點。Master 節點上常駐 Master 守護進程,負責管理全部的 Worker 節點。Worker 節點上常駐 Worker 守護進程,負責與 Master 節點通信並管理 Executors。

Driver 為用戶編寫的 Spark 應用程序所運行的進程。Driver 程序可以運行在 Master 節點上,也可運行在 Worker 節點上,還可運行在非 Spark 集群的節點上。


7.2 Spark 調度器

Spark 中主要有兩種調度器:DAGScheduler 和 TaskScheduler,DAGScheduler 主要是把一個 Job 根據 RDD 間的依賴關係,劃分為多個 Stage,對於劃分後的每個 Stage 都抽象為一個由多個 Task 組成的任務集(TaskSet),並交給 TaskScheduler 來進行進一步的任務調度。TaskScheduler 負責對每個具體的 Task 進行調度。

7.2.1 DAGScheduler

當創建一個 RDD 時,每個 RDD 中包含一個或多個分區,當執行 Action 操作時,相應的產生一個 Job,而一個 Job 會根據 RDD 間的依賴關係分解為多個 Stage,每個 Stage 由多個 Task 組成(即 TaskSet),每個 Task 處理 RDD 中的一個 Partition。一個 Stage 裡面所有分區的任務集合被包裝為一個 TaskSet 交給 TaskScheduler 來進行任務調度。這個過程由是由 DAGScheduler 來完成的。DAGScheduler 對 RDD 的調度過程如下圖所示:


深入淺出理解 Spark:環境部署與工作原理


(圖片來源:Core Services behind Spark Job Execution

7.2.2 TaskScheduler

DAGScheduler 將一個 TaskSet 交給 TaskScheduler 後,TaskScheduler 會為每個 TaskSet 進行任務調度,Spark 中的任務調度分為兩種:FIFO(先進先出)調度和 FAIR(公平調度)調度。

FIFO 調度:即誰先提交誰先執行,後面的任務需要等待前面的任務執行。這是 Spark 的默認的調度模式。

FAIR 調度:支持將作業分組到池中,併為每個池設置不同的調度權重,任務可以按照權重來決定執行順序。

在 Spark 中使用哪種調度器可通過配置spark.scheduler.mode參數來設置,可選的參數有 FAIR 和 FIFO,默認是 FIFO。

FIFO 調度算法為 FIFOSchedulingAlgorithm,該算法的 comparator 方法的 Scala 源代碼如下:

<code>overridedefcomparator(s1:Schedulable,s2:Schedulable):Boolean={
valpriority1=s1.priority//priority實際為JobID
valpriority2=s2.priority
varres=math.signum(priority1-priority2)
if(res==0){
valstageId1=s1.stageId
valstageId2=s2.stageId
res=math.signum(stageId1-stageId2)
}
res<0
}
/<code>

根據以上代碼,FIFO 調度算法實現的是:對於兩個調度任務 s1 和 s2,首先比較兩個任務的優先級(Job ID)大小,如果 priority1 比 priority2 小,那麼返回 true,表示 s1 的優先級比 s2 的高。由於 Job ID 是順序生成的,先生成的 Job ID 比較小,所以先提交的 Job 肯定比後提交的 Job 優先級高,也即先提交的 Job 會被先執行。

如果 s1 和 s2 的 priority 相同,表示為同一個 Job 的不同 Stage,則比較 Stage ID,Stage ID 小則優先級高。

FAIR 調度算法為 FairSchedulingAlgorithm,該算法的 comparator 方法的 Scala 源代碼如下:

<code>overridedefcomparator(s1:Schedulable,s2:Schedulable):Boolean={
valminShare1=s1.minShare
valminShare2=s2.minShare
valrunningTasks1=s1.runningTasks
valrunningTasks2=s2.runningTasks
vals1Needy=runningTasks1<minshare1>vals2Needy=runningTasks2<minshare2>valminShareRatio1=runningTasks1.toDouble/math.max(minShare1,1.0)
valminShareRatio2=runningTasks2.toDouble/math.max(minShare2,1.0)
valtaskToWeightRatio1=runningTasks1.toDouble/s1.weight.toDouble
valtaskToWeightRatio2=runningTasks2.toDouble/s2.weight.toDouble

varcompare=0
if(s1Needy&&!s2Needy){
returntrue
}elseif(!s1Needy&&s2Needy){
returnfalse
}elseif(s1Needy&&s2Needy){
compare=minShareRatio1.compareTo(minShareRatio2)
}else{
compare=taskToWeightRatio1.compareTo(taskToWeightRatio2)
}
if(compare<0){
true
}elseif(compare>0){
false
}else{
s1.name<s2.name>}
}
/<s2.name>/<minshare2>/<minshare1>/<code>

由以上代碼可以看到,FAIR 任務調度主要由兩個因子來控制(關於 FAIR 調度的配置,可參考${SPARK_HOME}/conf/fairscheduler.xml.template文件):

weight:相對於其它池,它控制池在集群中的份額。默認情況下,所有池的權值為 1。例如,如果給定一個特定池的權重為 2,它將獲得比其它池多兩倍的資源。設置高權重(比如 1000)也可以實現池與池之間的優先級。如果設置為-1000,則該調度池一有任務就會馬上運行。

minShare:最小 CPU 核心數,默認是 0,它能確保池總是能夠快速地獲得一定數量的資源(例如 10 個核),在權重相同的情況下,minShare 越大,可以獲得更多的資源。

對以上代碼的理解:

如果 s1 所在的任務池正在運行的任務數量比 minShare 小,而 s2 所在的任務池正在運行的任務數量比 minShare 大,那麼 s1 會優先調度。反之,s2 優先調度。

如果 s1 和 s2 所在的任務池正在運行的 task 數量都比各自 minShare 小,那麼 minShareRatio 小的優先被調度。

如果 s1 和 s2 所在的任務池正在運行的 task 數量都比各自 minShare 大,那麼 taskToWeightRatio 小的優先被調度。

如果 minShareRatio 或 taskToWeightRatio 相同,那麼最後比較各自 Pool 的名字。


7.3 Spark RDD 調度過程

如下圖所示,Spark 對 RDD 執行調度的過程,創建 RDD 並生成 DAG,由 DAGScheduler 分解 DAG 為包含多個 Task(即 TaskSet)的 Stages,再將 TaskSet 發送至 TaskScheduler,由 TaskScheduler 來調度每個 Task,並分配到 Worker 節點上執行,最後得到計算結果。

深入淺出理解 Spark:環境部署與工作原理


(圖片來源:Spark2.3.2 source code analysis


分享到:


相關文章: