一、shuffle原理分析
1.1 shuffle概述
一、shuffle原理分析
1.1 shuffle概述
Shuffle就是對數據進行重組,由於分佈式計算的特性和要求,在實現細節上更加繁瑣和複雜。
在MapReduce框架,Shuffle是連接Map和Reduce之間的橋樑,Map階段通過shuffle讀取數據並輸出到對應的Reduce,而Reduce階段負責從Map端拉取數據並進行計算。在整個shuffle過程中,往往伴隨著大量的磁盤和網絡I/O。所以shuffle性能的高低也直接決定了整個程序的性能高低。而Spark也會有自己的shuffle實現過程。
1.2 Spark中的 shuffle 介紹
在DAG調度的過程中,Stage 階段的劃分是根據是否有shuffle過程,也就是存在 寬依賴 的時候,需要進行shuffle,這時候會將 job 劃分成多個Stage,每一個 Stage 內部有很多可以並行運行的 Task。
stage與stage之間的過程就是 shuffle 階段,在 Spark 中,負責 shuffle 過程的執行、計算和處理的組件主要就是 ShuffleManager 。ShuffleManager 隨著Spark的發展有兩種實現的方式,分別為 HashShuffleManager 和 SortShuffleManager ,因此spark的Shuffle有 Hash Shuffle 和
Sort Shuffle 兩種。1.3 HashShuffle機制
1.3.1 HashShuffle 的介紹
在 Spark 1.2 以前,默認的shuffle計算引擎是 HashShuffleManager 。
HashShuffleManager 有著一個非常嚴重的弊端,就是會產生大量的中間磁盤文件,進而由大量的磁盤IO操作影響了性能。因此在Spark 1.2以後的版本中,默認的 ShuffleManager 改成了 SortShuffleManager 。
SortShuffleManager 相較於 HashShuffleManager 來說,有了一定的改進。主要就在於每個Task在進行shuffle操作時,雖然也會產生較多的臨時磁盤文件,但是最後會將所有的臨時文件合併(merge)成一個磁盤文件,因此每個 Task 就只有一個磁盤文件。在下一個 Stage 的shuffle read task拉取自己的數據時,只要根據索引讀取每個磁盤文件中的部分數據即可。
Hash shuffle是不具有排序的Shuffle。
1.3.2 普通機制的Hash shuffle
HashShuffleManager的運行機制主要分成兩種:一種是 普通運行機制 ,另一種是 合併運行機制 ,而合併機制主要是通過複用buffer來優化Shuffle過程中產生的小文件的數量。
先簡單說明一下情況。此時任務劃分為了兩個 Stage ,第一個 Stage 最上方有4個 MapTask , 而第二個 Stage 有3個 ReduceTask,但是如果我們現在的MapTask增多成1000個,那我們所產生的 block file 那不就有 MapTask*3 這麼多了,在這時大量的IO操作會造成很大的性能問題
1.3.3 普通機制的 Hash shuffle 的步驟詳細說明
這裡我們先明確一個假設前提:每個Executor只有1個CPU core,也就是說,無論這個Executor上分配多少個task線程,同一時間都只能執行一個task線程。
圖中有3個ReduceTask,從ShuffleMapTask 開始那邊各自把自己進行 Hash 計算(分區器:hash/numReduce取模),分類出3個不同的類別,每個 ShuffleMapTask 都分成3種類別的數據,想把不同的數據匯聚然後計算出最終的結果,所以ReduceTask 會在屬於自己類別的數據收集過來,匯聚成一個同類別的大集合,每1個 ShuffleMapTask 輸出3份本地文件,這裡有4個 ShuffleMapTask,所以總共輸出了4 x 3個分類文件 = 12個本地小文件。
Shuffle Write 階段:
主要就是在一個stage結束計算之後,為了下一個stage可以執行shuffle類的算子(比如reduceByKey,groupByKey),而將每個task處理的數據按key進行分區。所謂 “分區”,就是對相同的key執行hash算法,從而將相同key都寫入同一個磁盤文件中,而每一個磁盤文件都只屬於reduce端的stage的一個task。在將數據寫入磁盤之前,會先將數據寫入內存緩衝中,當內存緩衝填滿之後,才會溢寫到磁盤文件中去。
那麼每個執行 Shuffle Write 的 Task,要為下一個 Stage 創建多少個磁盤文件呢? 很簡單,下一個stage的task有多少個,當前stage的每個task就要創建多少份磁盤文件。比如下一個stage總共有100個task,那麼當前stage的每個task都要創建100份磁盤文件。如果當前stage有50個task,總共有10個Executor,每個Executor執行5個Task,那麼每個Executor上總共就要創建500個磁盤文件,所有Executor上會創建5000個磁盤文件。由此可見,未經優化的shuffle write操作所產生的磁盤文件的數量是極其驚人的。
Shuffle Read 階段:
Shuffle Read,通常就是一個stage剛開始時要做的事情。此時該stage的每一個task就需要將上一個stage的計算結果中的所有相同key,從各個節點上通過網絡都拉取到自己所在的節點上,然後進行key的聚合或連接等操作。由於shuffle write的過程中,task給Reduce端的stage的每個task都創建了一個磁盤文件,因此shuffle read的過程中,每個task只要從上游stage的所有task所在節點上,拉取屬於自己的那一個磁盤文件即可。
Shuffle Read的拉取過程是一邊拉取一邊進行聚合的。每個shuffle read task都會有一個自己的buffer緩衝,每次都只能拉取與buffer緩衝相同大小的數據,然後通過內存中的一個Map進行聚合等操作。聚合完一批數據後,再拉取下一批數據,並放到buffer緩衝中進行聚合操作。以此類推,直到最後將所有數據到拉取完,並得到最終的結果。
注意:
- buffer起到的是緩存作用,緩存能夠加速寫磁盤,提高計算的效率,buffer的默認大小32k。
- 分區器:根據hash/numRedcue取模決定數據由幾個Reduce處理,也決定了寫入幾個buffer中
- block file:磁盤小文件,從圖中我們可以知道磁盤小文件的個數計算公式:block file=M*R 。 M為map task的數量,R為Reduce的數量,一般Reduce的數量等於buffer的數量,都是由分區器決定的
Hash shuffle普通機制的問題:
- Shuffle階段在磁盤上會產生海量的小文件,建立通信和拉取數據的次數變多,此時會產生大量耗時低效的 IO 操作 (因為產生過多的小文件)
- 可能導致 OOM,大量耗時低效的 IO 操作 ,導致寫磁盤時的對象過多,讀磁盤時候的對象也過多,這些對象存儲在堆內存中,會導致堆內存不足,相應會導致頻繁的GC,GC會導致OOM。由於內存中需要保存海量文件操作句柄和臨時信息,如果數據處理的規模比較龐大的話,內存不可承受,會出現 OOM 等問題
1.3.4 合併機制的Hash shuffle
合併機制就是複用buffer緩衝區,開啟合併機制的配置是spark.shuffle.consolidateFiles。該參數默認值為false,將其設置為true即可開啟優化機制。通常來說,如果我們使用HashShuffleManager,那麼都建議開啟這個選項。
這裡有6個這裡有6個shuffleMapTask,數據類別還是分成3種類型,因為Hash算法會根據你的 Key 進行分類,在同一個進程中,無論是有多少過Task,都會把同樣的Key放在同一個Buffer裡,然後把Buffer中的數據寫入以Core數量為單位的本地文件中,(一個Core只有一種類型的Key的數據),每1個Task所在的進程中,分別寫入共同進程中的3份本地文件,這裡有6個shuffleMapTasks,所以總共輸出是 2個Cores x 3個分類文件 = 6個本地小文件。
此時block file = Core * R ,Core為CPU的核數,R為Reduce的數量,但是如果 Reducer 端的並行任務或者是數據分片過多的話則 Core * Reducer Task 依舊過大,也會產生很多小文件。
1.4 Sort shuffle
SortShuffleManager的運行機制也是主要分成兩種,普通運行機制 和 bypass運行機制
1.4.1 Sort shuffle 的普通機制
在該模式下,數據會先寫入一個數據結構,聚合算子寫入 Map,一邊通過 Map 局部聚合,一遍寫入內存。Join 算子寫入 ArrayList 直接寫入 內存 中。然後需要判斷是否達到 閾值(5M),如果達到就會將內存數據結構的數據寫入到磁盤,清空內存數據結構。
在溢寫磁盤前,先根據 key 進行 排序,排序 過後的數據,會分批寫入到磁盤文件中。默認批次為10000條,數據會以每批一萬條寫入到磁盤文件。寫入磁盤文件通過緩衝區溢寫的方式,每次溢寫都會產生一個磁盤文件,也就是說一個task過程會產生多個臨時文件。
最後在每個task中,將所有的臨時文件合併,這就是 merge 過程,此過程將所有臨時文件讀取出來,一次寫入到最終文件。意味著一個task的所有數據都在這一個文件中。同時單獨寫一份索引文件,標識下游各個task的數據在文件中的索引start offset和end offset(比如對於wordCount,下標從哪裡(start offset)到哪裡(end offset)是這個單詞)。
這個機制的好處:
- 小文件明顯變少了,一個task只生成一個file文件
- file文件整體有序,加上索引文件的輔助,查找變快,雖然排序浪費一些性能,但是查找變快很多
1.4.2 bypass模式的sortShuffle
bypass機制運行條件是shuffle map task數量小於spark.shuffle.sort.bypassMergeThreshold參數(默認值200)的值,且不是聚合類的shuffle算子(比如reduceByKey)
在 shuffleMapTask 數量 小於默認值200 時,啟用bypass模式的 sortShuffle,並沒有進行sort,原因是數據量本身比較少,沒必要進行sort全排序,因為數據量少本身查詢速度就快,正好省了sort的那部分性能開銷。
1.5 使用到的參數
1.5.1 spark.shuffle.file.buffer
buffer大小默認是32K,為了減少磁盤溢寫的次數,可以適當調整這個數值的大小。降低磁盤IO
1.5.2 spark.reducer.MaxSizeFlight
ReduceTask 拉取數據量的大小,默認48M
1.5.3 spark.shuffle.memoryFraction
shuffle聚合內存的比例,佔用executor內存比例的大小
1.5.4 spark.shuffle.io.maxRetries
拉取數據重試次數,防止網絡抖動帶來的影響
1.5.5 spark.shuffle.io.retryWait
調整到重試間隔時間,拉取失敗後多久才重新進行拉取
1.5.6 spark.shuffle.consolidateFiles
針對 HashShuffle 合併機制
1.5.7 spark.shuffle.sort.bypassMergeThreshold
SortShuffle bypass機制,默認200次
1.5.8 spark.sql.shuffle.partitions
默認200,shuffle時所使用到的分區數,也就是你生成的 part-00000,part-00001···最多也就只能 part-00199 了
Shuffle就是對數據進行重組,由於分佈式計算的特性和要求,在實現細節上更加繁瑣和複雜。
在MapReduce框架,Shuffle是連接Map和Reduce之間的橋樑,Map階段通過shuffle讀取數據並輸出到對應的Reduce,而Reduce階段負責從Map端拉取數據並進行計算。在整個shuffle過程中,往往伴隨著大量的磁盤和網絡I/O。所以shuffle性能的高低也直接決定了整個程序的性能高低。而Spark也會有自己的shuffle實現過程。
1.2 Spark中的 shuffle 介紹
在DAG調度的過程中,Stage 階段的劃分是根據是否有shuffle過程,也就是存在 寬依賴 的時候,需要進行shuffle,這時候會將 job 劃分成多個Stage,每一個 Stage 內部有很多可以並行運行的 Task。
stage與stage之間的過程就是 shuffle 階段,在 Spark 中,負責 shuffle 過程的執行、計算和處理的組件主要就是 ShuffleManager 。ShuffleManager 隨著Spark的發展有兩種實現的方式,分別為 HashShuffleManager 和 SortShuffleManager ,因此spark的Shuffle有 Hash Shuffle 和
Sort Shuffle 兩種。1.3 HashShuffle機制
1.3.1 HashShuffle 的介紹
在 Spark 1.2 以前,默認的shuffle計算引擎是 HashShuffleManager 。
HashShuffleManager 有著一個非常嚴重的弊端,就是會產生大量的中間磁盤文件,進而由大量的磁盤IO操作影響了性能。因此在Spark 1.2以後的版本中,默認的 ShuffleManager 改成了 SortShuffleManager 。
SortShuffleManager 相較於 HashShuffleManager 來說,有了一定的改進。主要就在於每個Task在進行shuffle操作時,雖然也會產生較多的臨時磁盤文件,但是最後會將所有的臨時文件合併(merge)成一個磁盤文件,因此每個 Task 就只有一個磁盤文件。在下一個 Stage 的shuffle read task拉取自己的數據時,只要根據索引讀取每個磁盤文件中的部分數據即可。
Hash shuffle是不具有排序的Shuffle。
1.3.2 普通機制的Hash shuffle
HashShuffleManager的運行機制主要分成兩種:一種是 普通運行機制 ,另一種是 合併運行機制 ,而合併機制主要是通過複用buffer來優化Shuffle過程中產生的小文件的數量。
先簡單說明一下情況。此時任務劃分為了兩個 Stage ,第一個 Stage 最上方有4個 MapTask , 而第二個 Stage 有3個 ReduceTask,但是如果我們現在的MapTask增多成1000個,那我們所產生的 block file 那不就有 MapTask*3 這麼多了,在這時大量的IO操作會造成很大的性能問題
1.3.3 普通機制的 Hash shuffle 的步驟詳細說明
這裡我們先明確一個假設前提:每個Executor只有1個CPU core,也就是說,無論這個Executor上分配多少個task線程,同一時間都只能執行一個task線程。
圖中有3個ReduceTask,從ShuffleMapTask 開始那邊各自把自己進行 Hash 計算(分區器:hash/numReduce取模),分類出3個不同的類別,每個 ShuffleMapTask 都分成3種類別的數據,想把不同的數據匯聚然後計算出最終的結果,所以ReduceTask 會在屬於自己類別的數據收集過來,匯聚成一個同類別的大集合,每1個 ShuffleMapTask 輸出3份本地文件,這裡有4個 ShuffleMapTask,所以總共輸出了4 x 3個分類文件 = 12個本地小文件。
Shuffle Write 階段:
主要就是在一個stage結束計算之後,為了下一個stage可以執行shuffle類的算子(比如reduceByKey,groupByKey),而將每個task處理的數據按key進行分區。所謂 “分區”,就是對相同的key執行hash算法,從而將相同key都寫入同一個磁盤文件中,而每一個磁盤文件都只屬於reduce端的stage的一個task。在將數據寫入磁盤之前,會先將數據寫入內存緩衝中,當內存緩衝填滿之後,才會溢寫到磁盤文件中去。
那麼每個執行 Shuffle Write 的 Task,要為下一個 Stage 創建多少個磁盤文件呢? 很簡單,下一個stage的task有多少個,當前stage的每個task就要創建多少份磁盤文件。比如下一個stage總共有100個task,那麼當前stage的每個task都要創建100份磁盤文件。如果當前stage有50個task,總共有10個Executor,每個Executor執行5個Task,那麼每個Executor上總共就要創建500個磁盤文件,所有Executor上會創建5000個磁盤文件。由此可見,未經優化的shuffle write操作所產生的磁盤文件的數量是極其驚人的。
Shuffle Read 階段:
Shuffle Read,通常就是一個stage剛開始時要做的事情。此時該stage的每一個task就需要將上一個stage的計算結果中的所有相同key,從各個節點上通過網絡都拉取到自己所在的節點上,然後進行key的聚合或連接等操作。由於shuffle write的過程中,task給Reduce端的stage的每個task都創建了一個磁盤文件,因此shuffle read的過程中,每個task只要從上游stage的所有task所在節點上,拉取屬於自己的那一個磁盤文件即可。
Shuffle Read的拉取過程是一邊拉取一邊進行聚合的。每個shuffle read task都會有一個自己的buffer緩衝,每次都只能拉取與buffer緩衝相同大小的數據,然後通過內存中的一個Map進行聚合等操作。聚合完一批數據後,再拉取下一批數據,並放到buffer緩衝中進行聚合操作。以此類推,直到最後將所有數據到拉取完,並得到最終的結果。
注意:
- buffer起到的是緩存作用,緩存能夠加速寫磁盤,提高計算的效率,buffer的默認大小32k。
- 分區器:根據hash/numRedcue取模決定數據由幾個Reduce處理,也決定了寫入幾個buffer中
- block file:磁盤小文件,從圖中我們可以知道磁盤小文件的個數計算公式:block file=M*R 。 M為map task的數量,R為Reduce的數量,一般Reduce的數量等於buffer的數量,都是由分區器決定的
Hash shuffle普通機制的問題:
- Shuffle階段在磁盤上會產生海量的小文件,建立通信和拉取數據的次數變多,此時會產生大量耗時低效的 IO 操作 (因為產生過多的小文件)
- 可能導致 OOM,大量耗時低效的 IO 操作 ,導致寫磁盤時的對象過多,讀磁盤時候的對象也過多,這些對象存儲在堆內存中,會導致堆內存不足,相應會導致頻繁的GC,GC會導致OOM。由於內存中需要保存海量文件操作句柄和臨時信息,如果數據處理的規模比較龐大的話,內存不可承受,會出現 OOM 等問題
1.3.4 合併機制的Hash shuffle
合併機制就是複用buffer緩衝區,開啟合併機制的配置是spark.shuffle.consolidateFiles。該參數默認值為false,將其設置為true即可開啟優化機制。通常來說,如果我們使用HashShuffleManager,那麼都建議開啟這個選項。
這裡有6個這裡有6個shuffleMapTask,數據類別還是分成3種類型,因為Hash算法會根據你的 Key 進行分類,在同一個進程中,無論是有多少過Task,都會把同樣的Key放在同一個Buffer裡,然後把Buffer中的數據寫入以Core數量為單位的本地文件中,(一個Core只有一種類型的Key的數據),每1個Task所在的進程中,分別寫入共同進程中的3份本地文件,這裡有6個shuffleMapTasks,所以總共輸出是 2個Cores x 3個分類文件 = 6個本地小文件。
此時block file = Core * R ,Core為CPU的核數,R為Reduce的數量,但是如果 Reducer 端的並行任務或者是數據分片過多的話則 Core * Reducer Task 依舊過大,也會產生很多小文件。
1.4 Sort shuffle
SortShuffleManager的運行機制也是主要分成兩種,普通運行機制 和 bypass運行機制
1.4.1 Sort shuffle 的普通機制
在該模式下,數據會先寫入一個數據結構,聚合算子寫入 Map,一邊通過 Map 局部聚合,一遍寫入內存。Join 算子寫入 ArrayList 直接寫入 內存 中。然後需要判斷是否達到 閾值(5M),如果達到就會將內存數據結構的數據寫入到磁盤,清空內存數據結構。
在溢寫磁盤前,先根據 key 進行 排序,排序 過後的數據,會分批寫入到磁盤文件中。默認批次為10000條,數據會以每批一萬條寫入到磁盤文件。寫入磁盤文件通過緩衝區溢寫的方式,每次溢寫都會產生一個磁盤文件,也就是說一個task過程會產生多個臨時文件。
最後在每個task中,將所有的臨時文件合併,這就是 merge 過程,此過程將所有臨時文件讀取出來,一次寫入到最終文件。意味著一個task的所有數據都在這一個文件中。同時單獨寫一份索引文件,標識下游各個task的數據在文件中的索引start offset和end offset(比如對於wordCount,下標從哪裡(start offset)到哪裡(end offset)是這個單詞)。
這個機制的好處:
- 小文件明顯變少了,一個task只生成一個file文件
- file文件整體有序,加上索引文件的輔助,查找變快,雖然排序浪費一些性能,但是查找變快很多
1.4.2 bypass模式的sortShuffle
bypass機制運行條件是shuffle map task數量小於spark.shuffle.sort.bypassMergeThreshold參數(默認值200)的值,且不是聚合類的shuffle算子(比如reduceByKey)
在 shuffleMapTask 數量 小於默認值200 時,啟用bypass模式的 sortShuffle,並沒有進行sort,原因是數據量本身比較少,沒必要進行sort全排序,因為數據量少本身查詢速度就快,正好省了sort的那部分性能開銷。
1.5 使用到的參數
1.5.1 spark.shuffle.file.buffer
buffer大小默認是32K,為了減少磁盤溢寫的次數,可以適當調整這個數值的大小。降低磁盤IO
1.5.2 spark.reducer.MaxSizeFlight
ReduceTask 拉取數據量的大小,默認48M
1.5.3 spark.shuffle.memoryFraction
shuffle聚合內存的比例,佔用executor內存比例的大小
1.5.4 spark.shuffle.io.maxRetries
拉取數據重試次數,防止網絡抖動帶來的影響
1.5.5 spark.shuffle.io.retryWait
調整到重試間隔時間,拉取失敗後多久才重新進行拉取
1.5.6 spark.shuffle.consolidateFiles
針對 HashShuffle 合併機制
1.5.7 spark.shuffle.sort.bypassMergeThreshold
SortShuffle bypass機制,默認200次
1.5.8 spark.sql.shuffle.partitions
默認200,shuffle時所使用到的分區數,也就是你生成的 part-00000,part-00001···最多也就只能 part-00199 了
閱讀更多 架構師師長 的文章