Spark 常規性能調優

1 常規性能調優一:最優資源配置

Spark 性能調優的第一步,就是為任務分配更多的資源,在一定範圍內,增加資源的分配與性能的提升是成正比的,實現了最優的資源配置後(其實就是沒錢提升硬件後),在此基礎上再考慮進行後面論述的性能調優策略。

資源的分配在使用腳本提交 Spark 任務時進行指定,標準的 Spark 任務提交腳本如代碼清單2-1所示:

代碼清單 2-1 標準 Spark 提交腳本

/usr/opt/modules/spark/bin/spark-submit \\
--class com.test.spark.Analysis \\
--num-executors 80 \\
--driver-memory 6g \\
--executor-memory 6g \\
--executor-cores 3 \\
/usr/opt/modules/spark/jar/spark.jar \\

可以進行分配的資源如表 2-1 所示:

表 2-1 可分配資源表


Spark 常規性能調優

調節原則:儘量將任務分配的資源調節到可以使用的資源的最大限度。

對於具體資源的分配,我們分別討論 Spark 的兩種 Cluster 運行模式:

第一種是 Spark Standalone 模式,你在提交任務前,一定知道或者可以從運維部門獲取到你可以使用的資源情況,在編寫 submit 腳本的時候,就根據可用的資源情況進行資源的分配,比如說集群有 15 臺機器,每臺機器為 8G 內存,2 個 CPU core,那麼就指定 15 個 Executor,每個 Executor 分配 8G 內存,2 個 CPU core。

第二種是 Spark Yarn 模式,由於 Yarn 使用資源隊列進行資源的分配和調度,在寫 submit 腳本的時候,就根據 Spark 作業要提交到的資源隊列,進行資源的分配,比如資源隊列有 400G 內存,100 個 CPU core,那麼指定 50 個 Executor,每個 Executor 分配 8G 內存,2 個 CPU core。

對錶 2-1 中的各項資源進行了調節後,得到的性能提升如表 2-2 所示:

表2-2 資源調節後的性能提升

Spark 常規性能調優

補充:生產環境 Spark submit 腳本配置

/usr/local/spark/bin/spark-submit \\
--class com.test.spark.WordCount \\
--num-executors 80 \\
--driver-memory 6g \\
--executor-memory 6g \\
--executor-cores 3 \\
--master yarn-cluster \\
--queue root.default \\
--conf spark.yarn.executor.memoryOverhead=2048 \\
--conf spark.core.connection.ack.wait.timeout=300 \\
/usr/local/spark/spark.jar

參數配置參考值:

--num-executors:50~100

--driver-memory:1G~5G

--executor-memory:6G~10G

--executor-cores:3

--master:實際生產環境一定使用yarn-cluster

實際場景舉例:

上面說的這些 submit 提交,相信大家都需要做,但看完這小結後你在提交任務時會進行最優配置了嗎?沒有實際工作過的同學肯定還是不會,因為這些說白了還是理論,對是對,但實際生產中不能生搬硬套。

比如工作中一般會用到 CDH 的公司,比如小編的,一般會在 oozie 上通過建立 workflow 進行任務的調度,那麼對於跑 Spark 的任務,其參數配置就要視業務而定了。上面說到 --driver-memory 這個配置不必太大,一般都會比 --executor-memory 小,因為後者才是進行計算的地方。但小編的公司 --driver-memory 居然會設置成 --executor-memory 的兩倍。為啥?因為我們有的業務需要拉到 driver 端的緩存中,小就爆了,別問我為什麼拉到 driver 端,這多傻啊,業務場景不一樣,需要的計算方式也不一樣。當然我不否定會有更優的方式處理,這裡只是說理論不能偏離實際。

配置額外補充:動態資源分配(小編公司中使用的配置方式)

--conf spark.shuffle.service.enabled=true //啟用External shuffle Service服務
--conf spark.dynamicAllocation.enabled=true //開啟動態資源分配
--conf spark.dynamicAllocation.minExecutors=1 //每個Application最小分配的executor數
--conf spark.dynamicAllocation.maxExecutors=40 //每個Application最大併發分配的executor數

開啟動態分配策略後,application 會在 task 因沒有足夠資源被掛起的時候去動態申請資源,這種情況意味著該 application 現有的 executor 無法滿足所有 task 並行運行。spark 一輪一輪的申請資源,當有 task 掛起或等待 spark.dynamicAllocation.schedulerBacklogTimeout (默認1s)時間的時候,會開始動態資源分配;之後會每隔 spark.dynamicAllocation.sustainedSchedulerBacklogTimeout (默認1s)時間申請一次,直到申請到足夠的資源。每次申請的資源量是指數增長的,即1,2,4,8等。

之所以採用指數增長,出於兩方面考慮:其一,開始申請的少是考慮到可能 application 會馬上得到滿足;其次要成倍增加,是為了防止 application 需要很多資源,而該方式可以在很少次數的申請之後得到滿足。

2 常規性能調優二:RDD 優化

1.RDD 複用

在對 RDD 進行算子時,要避免相同的算子和計算邏輯之下對 RDD 進行重複的計算,如圖 2-1 所示:

Spark 常規性能調優

對圖 2-1 中的 RDD 計算架構進行修改,得到如圖 2-2 所示的優化結果:

Spark 常規性能調優

2.RDD 持久化

在 Spark 中,當多次對同一個 RDD 執行算子操作時,每一次都會對這個 RDD 以之前的父 RDD 重新計算一次,這種情況是必須要避免的,對同一個 RDD 的重複計算是對資源的極大浪費,因此,必須對多次使用的 RDD 進行持久化,通過持久化將公共 RDD 的數據緩存到內存/磁盤中,之後對於公共 RDD 的計算都會從內存/磁盤中直接獲取 RDD 數據。

對於 RDD 的持久化,有兩點需要說明:

第一,RDD 的持久化是可以進行序列化的,當內存無法將 RDD 的數據完整的進行存放的時候,可以考慮使用序列化的方式減小數據體積,將數據完整存儲在內存中。

第二,如果對於數據的可靠性要求很高,並且內存充足,可以使用副本機制,對 RDD 數據進行持久化。當持久化啟用了複本機制時,對於持久化的每個數據單元都存儲一個副本,放在其他節點上面,由此實現數據的容錯,一旦一個副本數據丟失,不需要重新計算,還可以使用另外一個副本。

  1. RDD 儘可能早的 filter 操作

獲取到初始 RDD 後,應該考慮儘早地過濾掉不需要的數據,進而減少對內存的佔用,從而提升 Spark 作業的運行效率。

3 常規性能調優三:並行度調節

Spark 作業中的並行度指各個 stage 的 task 的數量。

如果並行度設置不合理而導致並行度過低,會導致資源的極大浪費,例如,20 個 Executor,每個 Executor 分配 3 個 CPU core,而 Spark 作業有 40 個 task,這樣每個 Executor 分配到的 task 個數是 2 個,這就使得每個 Executor 有一個 CPU core 空閒,導致資源的浪費。

理想的並行度設置,應該是讓並行度與資源相匹配,簡單來說就是在資源允許的前提下,並行度要設置的儘可能大,達到可以充分利用集群資源。合理的設置並行度,可以提升整個 Spark 作業的性能和運行速度。

Spark 官方推薦,task 數量應該設置為 Spark 作業總 CPU core 數量的2~3倍。之所以沒有推薦 task 數量與 CPU core 總數相等,是因為 task 的執行時間不同,有的 task 執行速度快而有的 task 執行速度慢,如果 task 數量與 CPU core 總數相等,那麼執行快的 task 執行完成後,會出現 CPU core 空閒的情況。如果 task 數量設置為 CPU core 總數的2~3倍,那麼一個 task 執行完畢後,CPU core 會立刻執行下一個 task,降低了資源的浪費,同時提升了 Spark 作業運行的效率。

Spark作業並行度的設置如代碼清單2-2所示:

代碼清單2-2 Spark 作業並行度設置

val conf = new SparkConf().set("spark.default.parallelism", "500")

4 常規性能調優四:廣播大變量

默認情況下,task 中的算子中如果使用了外部的變量,每個 task 都會獲取一份變量的複本,這就造成了內存的極大消耗。一方面,如果後續對 RDD 進行持久化,可能就無法將 RDD 數據存入內存,只能寫入磁盤,磁盤 IO 將會嚴重消耗性能;另一方面,task 在創建對象的時候,也許會發現堆內存無法存放新創建的對象,這就會導致頻繁的 GC,GC 會導致工作線程停止,進而導致 Spark 暫停工作一段時間,嚴重影響 Spark 性能。

假設當前任務配置了 20 個 Executor,指定 500 個 task,有一個 20M 的變量被所有 task 共用,此時會在500 個 task 中產生 500 個副本,耗費集群 10G 的內存,如果使用了廣播變量, 那麼每個 Executor 保存一個副本,一共消耗 400M 內存,內存消耗減少了 5 倍。

廣播變量在每個 Executor 保存一個副本,此 Executor 的所有 task 共用此廣播變量,這讓變量產生的副本數量大大減少。

在初始階段,廣播變量只在 Driver 中有一份副本。task 在運行的時候,想要使用廣播變量中的數據,此時首先會在自己本地的 Executor 對應的 BlockManager 中嘗試獲取變量,如果本地沒有,BlockManager 就會從 Driver 或者其他節點的 BlockManager 上遠程拉取變量的複本,並由本地的 BlockManager 進行管理;之後此 Executor 的所有 task 都會直接從本地的 BlockManager 中獲取變量。

注意:這裡說的大變量也是有限制的,如果變量太大,超過了 executor 內存,那麼這個變量時放不進去的,就會 OOM。

5 常規性能調優五:Kryo 序列化

默認情況下,Spark 使用 Java 的序列化機制。Java 的序列化機制使用方便,不需要額外的配置,在算子中使用的變量實現 Serializable 接口即可,但是,Java 序列化機制的效率不高,序列化速度慢並且序列化後的數據所佔用的空間依然較大。

Kryo 序列化機制比 Java 序列化機制性能提高 10 倍左右,Spark 之所以沒有默認使用 Kryo 作為序列化類庫,是因為它不支持所有對象的序列化,同時 Kryo 需要用戶在使用前註冊需要序列化的類型,不夠方便,但從Spark 2.0.0 版本開始,簡單類型、簡單類型數組、字符串類型的 Shuffling RDDs 已經默認使用 Kryo 序列化方式了。

Kryo 序列化註冊方式的實例代碼如代碼清單 2-3 所示:

代碼清單 2-3 Kryo 序列化機制配置代碼

public class MyKryoRegistrator implements KryoRegistrator
{
@Override
public void registerClasses(Kryo kryo)
{
kryo.register(StartupReportLogs.class);
}
}

配置 Kryo 序列化方式的實例代碼如代碼清單2-4所示:

代碼清單 2-4 Kryo 序列化機制配置代碼

//創建SparkConf對象
val conf = new SparkConf().setMaster(…).setAppName(…)
//使用Kryo序列化庫,如果要使用Java序列化庫,需要把該行屏蔽掉
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
//在Kryo序列化庫中註冊自定義的類集合,如果要使用Java序列化庫,需要把該行屏蔽掉
conf.set("spark.kryo.registrator", "test.com.MyKryoRegistrator")

6 常規性能調優六:調節本地化等待時長

Spark 作業運行過程中,Driver 會對每一個 stage 的 task 進行分配。根據 Spark 的 task 分配算法,Spark 希望 task 能夠運行在它要計算的數據所在的節點(數據本地化思想),這樣就可以避免數據的網絡傳輸。通常來說,task 可能不會被分配到它處理的數據所在的節點,因為這些節點可用的資源可能已經用盡,此時,Spark 會等待一段時間,默認 3s,如果等待指定時間後仍然無法在指定節點運行,那麼會自動降級,嘗試將 task 分配到比較差的本地化級別所對應的節點上,比如將 task 分配到離它要計算的數據比較近的一個節點,然後進行計算,如果當前級別仍然不行,那麼繼續降級。

當 task 要處理的數據不在 task 所在節點上時,會發生數據的傳輸。task 會通過所在節點的 BlockManager 獲取數據,BlockManager 發現數據不在本地時,會通過網絡傳輸組件從數據所在節點的 BlockManager 處獲取數據。

網絡傳輸數據的情況是我們不願意看到的,大量的網絡傳輸會嚴重影響性能,因此,我們希望通過調節本地化等待時長,如果在等待時長這段時間內,目標節點處理完成了一部分task,那麼當前的task將有機會得到執行,這樣就能夠改善Spark作業的整體性能。

Spark的本地化等級如表2-3所示:

表2-3 Spark本地化等級


Spark 常規性能調優

在Spark項目開發階段,可以使用client模式對程序進行測試,此時,可以在本地看到比較全的日誌信息,日誌信息中有明確的task數據本地化的級別,如果大部分都是PROCESS_LOCAL,那麼就無需進行調節,但是如果發現很多的級別都是NODE_LOCAL、ANY,那麼需要對本地化的等待時長進行調節,通過延長本地化等待時長,看看task的本地化級別有沒有提升,並觀察Spark作業的運行時間有沒有縮短。

注意,過猶不及,不要將本地化等待時長延長地過長,導致因為大量的等待時長,使得Spark作業的運行時間反而增加了。

Spark本地化等待時長的設置如代碼清單2-5所示:

代碼清單2-5 Spark本地化等待時長設置示例

val conf = new SparkConf().set("spark.locality.wait", "6") 



分享到:


相關文章: