Spark Shuffle調優

1 Shuffle調優一:調節map端緩衝區大小

在Spark任務運行過程中,如果shuffle的map端處理的數據量比較大,但是map端緩衝的大小是固定的,可能會出現map端緩衝數據頻繁spill溢寫到磁盤文件中的情況,使得性能非常低下,通過調節map端緩衝的大小,可以避免頻繁的磁盤IO操作,進而提升Spark任務的整體性能。

map端緩衝的默認配置是32KB,如果每個task處理640KB的數據,那麼會發生640/32 = 20次溢寫,如果每個task處理64000KB的數據,機會發生64000/32=2000此溢寫,這對於性能的影響是非常嚴重的。

map端緩衝的配置方法如代碼清單2-7所示:

代碼清單2-7 map端緩衝配置

val conf = new SparkConf().set("spark.shuffle.file.buffer", "64") 

2 Shuffle調優二:調節reduce端拉取數據緩衝區大小

Spark Shuffle過程中,shuffle reduce task的buffer緩衝區大小決定了reduce task每次能夠緩衝的數據量,也就是每次能夠拉取的數據量,如果內存資源較為充足,適當增加拉取數據緩衝區的大小,可以減少拉取數據的次數,也就可以減少網絡傳輸的次數,進而提升性能。

reduce端數據拉取緩衝區的大小可以通過spark.reducer.maxSizeInFlight參數進行設置,默認為48MB,該參數的設置方法如代碼清單2-8所示:

代碼清單2-8 reduce端數據拉取緩衝區配置

val conf = new SparkConf().set("spark.reducer.maxSizeInFlight", "96") 

3 Shuffle調優三:調節reduce端拉取數據重試次數

Spark Shuffle過程中,reduce task拉取屬於自己的數據時,如果因為網絡異常等原因導致失敗會自動進行重試。對於那些包含了特別耗時的shuffle操作的作業,建議增加重試最大次數(比如60次),以避免由於JVM的full gc或者網絡不穩定等因素導致的數據拉取失敗。在實踐中發現,對於針對超大數據量(數十億~上百億)的shuffle過程,調節該參數可以大幅度提升穩定性。

reduce端拉取數據重試次數可以通過spark.shuffle.io.maxRetries參數進行設置,該參數就代表了可以重試的最大次數。如果在指定次數之內拉取還是沒有成功,就可能會導致作業執行失敗,默認為3,該參數的設置方法如代碼清單2-9所示:

代碼清單2-9 reduce端拉取數據重試次數配置

val conf = new SparkConf().set("spark.shuffle.io.maxRetries", "6") 

4 Shuffle調優四:調節reduce端拉取數據等待間隔

Spark Shuffle過程中,reduce task拉取屬於自己的數據時,如果因為網絡異常等原因導致失敗會自動進行重試,在一次失敗後,會等待一定的時間間隔再進行重試,可以通過加大間隔時長(比如60s),以增加shuffle操作的穩定性。

reduce端拉取數據等待間隔可以通過spark.shuffle.io.retryWait參數進行設置,默認值為5s,該參數的設置方法如代碼清單2-10所示:

代碼清單2-10 reduce端拉取數據等待間隔配置

val conf = new SparkConf().set("spark.shuffle.io.retryWait", "60s") 

5 Shuffle調優五:調節SortShuffle排序操作閾值

對於SortShuffleManager,如果shuffle reduce task的數量小於某一閾值則shuffle write過程中不會進行排序操作,而是直接按照未經優化的HashShuffleManager的方式去寫數據,但是最後會將每個task產生的所有臨時磁盤文件都合併成一個文件,並會創建單獨的索引文件。

當你使用SortShuffleManager時,如果的確不需要排序操作,那麼建議將這個參數調大一些,大於shuffle read task的數量,那麼此時map-side就不會進行排序了,減少了排序的性能開銷,但是這種方式下,依然會產生大量的磁盤文件,因此shuffle write性能有待提高。

SortShuffleManager排序操作閾值的設置可以通過spark.shuffle.sort. bypassMergeThreshold這一參數進行設置,默認值為200,該參數的設置方法如代碼清單2-11所示:

代碼清單2-10 reduce端拉取數據等待間隔配置

val conf = new SparkConf().set("spark.shuffle.sort.bypassMergeThreshold", "400")  



分享到:


相關文章: