Spark的Shuffle總結分析

一、shuffle原理分析

1.1 shuffle概述

一、shuffle原理分析

1.1 shuffle概述

Shuffle就是對數據進行重組,由於分佈式計算的特性和要求,在實現細節上更加繁瑣和複雜。


Spark的Shuffle總結分析


在MapReduce框架,Shuffle是連接Map和Reduce之間的橋樑,Map階段通過shuffle讀取數據並輸出到對應的Reduce,而Reduce階段負責從Map端拉取數據並進行計算。在整個shuffle過程中,往往伴隨著大量的磁盤和網絡I/O。所以shuffle性能的高低也直接決定了整個程序的性能高低。而Spark也會有自己的shuffle實現過程。

1.2 Spark中的 shuffle 介紹

在DAG調度的過程中,Stage 階段的劃分是根據是否有shuffle過程,也就是存在 寬依賴 的時候,需要進行shuffle,這時候會將 job 劃分成多個Stage,每一個 Stage 內部有很多可以並行運行的 Task

stage與stage之間的過程就是 shuffle 階段,在 Spark 中,負責 shuffle 過程的執行、計算和處理的組件主要就是 ShuffleManager 。ShuffleManager 隨著Spark的發展有兩種實現的方式,分別為 HashShuffleManagerSortShuffleManager ,因此spark的Shuffle有 Hash Shuffle

Sort Shuffle 兩種。

1.3 HashShuffle機制

1.3.1 HashShuffle 的介紹

Spark 1.2 以前,默認的shuffle計算引擎是 HashShuffleManager

HashShuffleManager 有著一個非常嚴重的弊端,就是會產生大量的中間磁盤文件,進而由大量的磁盤IO操作影響了性能。因此在Spark 1.2以後的版本中,默認的 ShuffleManager 改成了 SortShuffleManager

SortShuffleManager 相較於 HashShuffleManager 來說,有了一定的改進。主要就在於每個Task在進行shuffle操作時,雖然也會產生較多的臨時磁盤文件,但是最後會將所有的臨時文件合併(merge)成一個磁盤文件,因此每個 Task 就只有一個磁盤文件。在下一個 Stage 的shuffle read task拉取自己的數據時,只要根據索引讀取每個磁盤文件中的部分數據即可。

Hash shuffle是不具有排序的Shuffle。

1.3.2 普通機制的Hash shuffle

HashShuffleManager的運行機制主要分成兩種:一種是 普通運行機制 ,另一種是 合併運行機制 ,而合併機制主要是通過複用buffer來優化Shuffle過程中產生的小文件的數量。


Spark的Shuffle總結分析


先簡單說明一下情況。此時任務劃分為了兩個 Stage ,第一個 Stage 最上方有4個 MapTask , 而第二個 Stage 有3個 ReduceTask,但是如果我們現在的MapTask增多成1000個,那我們所產生的 block file 那不就有 MapTask*3 這麼多了,在這時大量的IO操作會造成很大的性能問題

1.3.3 普通機制的 Hash shuffle 的步驟詳細說明

這裡我們先明確一個假設前提:每個Executor只有1個CPU core,也就是說,無論這個Executor上分配多少個task線程,同一時間都只能執行一個task線程

圖中有3個ReduceTask,從ShuffleMapTask 開始那邊各自把自己進行 Hash 計算(分區器:hash/numReduce取模),分類出3個不同的類別,每個 ShuffleMapTask 都分成3種類別的數據,想把不同的數據匯聚然後計算出最終的結果,所以ReduceTask 會在屬於自己類別的數據收集過來,匯聚成一個同類別的大集合,每1個 ShuffleMapTask 輸出3份本地文件,這裡有4個 ShuffleMapTask,所以總共輸出了4 x 3個分類文件 = 12個本地小文件。

Shuffle Write 階段:

主要就是在一個stage結束計算之後,為了下一個stage可以執行shuffle類的算子(比如reduceByKey,groupByKey),而將每個task處理的數據按key進行分區。所謂 “分區”,就是對相同的key執行hash算法,從而將相同key都寫入同一個磁盤文件中,而每一個磁盤文件都只屬於reduce端的stage的一個task。在將數據寫入磁盤之前,會先將數據寫入內存緩衝中,當內存緩衝填滿之後,才會溢寫到磁盤文件中去。

那麼每個執行 Shuffle Write 的 Task,要為下一個 Stage 創建多少個磁盤文件呢? 很簡單,下一個stage的task有多少個,當前stage的每個task就要創建多少份磁盤文件。比如下一個stage總共有100個task,那麼當前stage的每個task都要創建100份磁盤文件。如果當前stage有50個task,總共有10個Executor,每個Executor執行5個Task,那麼每個Executor上總共就要創建500個磁盤文件,所有Executor上會創建5000個磁盤文件。由此可見,未經優化的shuffle write操作所產生的磁盤文件的數量是極其驚人的。

Shuffle Read 階段:

Shuffle Read,通常就是一個stage剛開始時要做的事情。此時該stage的每一個task就需要將上一個stage的計算結果中的所有相同key,從各個節點上通過網絡都拉取到自己所在的節點上,然後進行key的聚合或連接等操作。由於shuffle write的過程中,task給Reduce端的stage的每個task都創建了一個磁盤文件,因此shuffle read的過程中,每個task只要從上游stage的所有task所在節點上,拉取屬於自己的那一個磁盤文件即可。

Shuffle Read的拉取過程是一邊拉取一邊進行聚合的。每個shuffle read task都會有一個自己的buffer緩衝,每次都只能拉取與buffer緩衝相同大小的數據,然後通過內存中的一個Map進行聚合等操作。聚合完一批數據後,再拉取下一批數據,並放到buffer緩衝中進行聚合操作。以此類推,直到最後將所有數據到拉取完,並得到最終的結果。

注意

  1. buffer起到的是緩存作用,緩存能夠加速寫磁盤,提高計算的效率,buffer的默認大小32k。
  2. 分區器:根據hash/numRedcue取模決定數據由幾個Reduce處理,也決定了寫入幾個buffer中
  3. block file:磁盤小文件,從圖中我們可以知道磁盤小文件的個數計算公式:block file=M*R 。 M為map task的數量,R為Reduce的數量,一般Reduce的數量等於buffer的數量,都是由分區器決定的

Hash shuffle普通機制的問題

  1. Shuffle階段在磁盤上會產生海量的小文件,建立通信和拉取數據的次數變多,此時會產生大量耗時低效的 IO 操作 (因為產生過多的小文件)
  2. 可能導致 OOM,大量耗時低效的 IO 操作 ,導致寫磁盤時的對象過多,讀磁盤時候的對象也過多,這些對象存儲在堆內存中,會導致堆內存不足,相應會導致頻繁的GC,GC會導致OOM。由於內存中需要保存海量文件操作句柄和臨時信息,如果數據處理的規模比較龐大的話,內存不可承受,會出現 OOM 等問題

1.3.4 合併機制的Hash shuffle

合併機制就是複用buffer緩衝區,開啟合併機制的配置是spark.shuffle.consolidateFiles。該參數默認值為false,將其設置為true即可開啟優化機制。通常來說,如果我們使用HashShuffleManager,那麼都建議開啟這個選項。


Spark的Shuffle總結分析


這裡有6個這裡有6個shuffleMapTask,數據類別還是分成3種類型,因為Hash算法會根據你的 Key 進行分類,在同一個進程中,無論是有多少過Task,都會把同樣的Key放在同一個Buffer裡,然後把Buffer中的數據寫入以Core數量為單位的本地文件中,(一個Core只有一種類型的Key的數據),每1個Task所在的進程中,分別寫入共同進程中的3份本地文件,這裡有6個shuffleMapTasks,所以總共輸出是 2個Cores x 3個分類文件 = 6個本地小文件。

此時block file = Core * R ,Core為CPU的核數,R為Reduce的數量,但是如果 Reducer 端的並行任務或者是數據分片過多的話則 Core * Reducer Task 依舊過大,也會產生很多小文件。

1.4 Sort shuffle

SortShuffleManager的運行機制也是主要分成兩種,普通運行機制bypass運行機制

1.4.1 Sort shuffle 的普通機制


Spark的Shuffle總結分析


在該模式下,數據會先寫入一個數據結構,聚合算子寫入 Map,一邊通過 Map 局部聚合,一遍寫入內存。Join 算子寫入 ArrayList 直接寫入 內存 中。然後需要判斷是否達到 閾值(5M),如果達到就會將內存數據結構的數據寫入到磁盤,清空內存數據結構。

在溢寫磁盤前,先根據 key 進行 排序,排序 過後的數據,會分批寫入到磁盤文件中。默認批次為10000條,數據會以每批一萬條寫入到磁盤文件。寫入磁盤文件通過緩衝區溢寫的方式,每次溢寫都會產生一個磁盤文件,也就是說一個task過程會產生多個臨時文件。

最後在每個task中,將所有的臨時文件合併,這就是 merge 過程,此過程將所有臨時文件讀取出來,一次寫入到最終文件。意味著一個task的所有數據都在這一個文件中。同時單獨寫一份索引文件,標識下游各個task的數據在文件中的索引start offset和end offset(比如對於wordCount,下標從哪裡(start offset)到哪裡(end offset)是這個單詞)。

這個機制的好處:

  1. 小文件明顯變少了,一個task只生成一個file文件
  2. file文件整體有序,加上索引文件的輔助,查找變快,雖然排序浪費一些性能,但是查找變快很多

1.4.2 bypass模式的sortShuffle

bypass機制運行條件是shuffle map task數量小於spark.shuffle.sort.bypassMergeThreshold參數(默認值200)的值,且不是聚合類的shuffle算子(比如reduceByKey)


Spark的Shuffle總結分析


在 shuffleMapTask 數量 小於默認值200 時,啟用bypass模式的 sortShuffle,並沒有進行sort,原因是數據量本身比較少,沒必要進行sort全排序,因為數據量少本身查詢速度就快,正好省了sort的那部分性能開銷。

1.5 使用到的參數

1.5.1 spark.shuffle.file.buffer

buffer大小默認是32K,為了減少磁盤溢寫的次數,可以適當調整這個數值的大小。降低磁盤IO

1.5.2 spark.reducer.MaxSizeFlight

ReduceTask 拉取數據量的大小,默認48M

1.5.3 spark.shuffle.memoryFraction

shuffle聚合內存的比例,佔用executor內存比例的大小

1.5.4 spark.shuffle.io.maxRetries

拉取數據重試次數,防止網絡抖動帶來的影響

1.5.5 spark.shuffle.io.retryWait

調整到重試間隔時間,拉取失敗後多久才重新進行拉取

1.5.6 spark.shuffle.consolidateFiles

針對 HashShuffle 合併機制

1.5.7 spark.shuffle.sort.bypassMergeThreshold

SortShuffle bypass機制,默認200次

1.5.8 spark.sql.shuffle.partitions

默認200,shuffle時所使用到的分區數,也就是你生成的 part-00000,part-00001···最多也就只能 part-00199 了


Shuffle就是對數據進行重組,由於分佈式計算的特性和要求,在實現細節上更加繁瑣和複雜。


Spark的Shuffle總結分析


在MapReduce框架,Shuffle是連接Map和Reduce之間的橋樑,Map階段通過shuffle讀取數據並輸出到對應的Reduce,而Reduce階段負責從Map端拉取數據並進行計算。在整個shuffle過程中,往往伴隨著大量的磁盤和網絡I/O。所以shuffle性能的高低也直接決定了整個程序的性能高低。而Spark也會有自己的shuffle實現過程。

1.2 Spark中的 shuffle 介紹

在DAG調度的過程中,Stage 階段的劃分是根據是否有shuffle過程,也就是存在 寬依賴 的時候,需要進行shuffle,這時候會將 job 劃分成多個Stage,每一個 Stage 內部有很多可以並行運行的 Task

stage與stage之間的過程就是 shuffle 階段,在 Spark 中,負責 shuffle 過程的執行、計算和處理的組件主要就是 ShuffleManager 。ShuffleManager 隨著Spark的發展有兩種實現的方式,分別為 HashShuffleManagerSortShuffleManager ,因此spark的Shuffle有 Hash Shuffle

Sort Shuffle 兩種。

1.3 HashShuffle機制

1.3.1 HashShuffle 的介紹

Spark 1.2 以前,默認的shuffle計算引擎是 HashShuffleManager

HashShuffleManager 有著一個非常嚴重的弊端,就是會產生大量的中間磁盤文件,進而由大量的磁盤IO操作影響了性能。因此在Spark 1.2以後的版本中,默認的 ShuffleManager 改成了 SortShuffleManager

SortShuffleManager 相較於 HashShuffleManager 來說,有了一定的改進。主要就在於每個Task在進行shuffle操作時,雖然也會產生較多的臨時磁盤文件,但是最後會將所有的臨時文件合併(merge)成一個磁盤文件,因此每個 Task 就只有一個磁盤文件。在下一個 Stage 的shuffle read task拉取自己的數據時,只要根據索引讀取每個磁盤文件中的部分數據即可。

Hash shuffle是不具有排序的Shuffle。

1.3.2 普通機制的Hash shuffle

HashShuffleManager的運行機制主要分成兩種:一種是 普通運行機制 ,另一種是 合併運行機制 ,而合併機制主要是通過複用buffer來優化Shuffle過程中產生的小文件的數量。


Spark的Shuffle總結分析


先簡單說明一下情況。此時任務劃分為了兩個 Stage ,第一個 Stage 最上方有4個 MapTask , 而第二個 Stage 有3個 ReduceTask,但是如果我們現在的MapTask增多成1000個,那我們所產生的 block file 那不就有 MapTask*3 這麼多了,在這時大量的IO操作會造成很大的性能問題

1.3.3 普通機制的 Hash shuffle 的步驟詳細說明

這裡我們先明確一個假設前提:每個Executor只有1個CPU core,也就是說,無論這個Executor上分配多少個task線程,同一時間都只能執行一個task線程

圖中有3個ReduceTask,從ShuffleMapTask 開始那邊各自把自己進行 Hash 計算(分區器:hash/numReduce取模),分類出3個不同的類別,每個 ShuffleMapTask 都分成3種類別的數據,想把不同的數據匯聚然後計算出最終的結果,所以ReduceTask 會在屬於自己類別的數據收集過來,匯聚成一個同類別的大集合,每1個 ShuffleMapTask 輸出3份本地文件,這裡有4個 ShuffleMapTask,所以總共輸出了4 x 3個分類文件 = 12個本地小文件。

Shuffle Write 階段:

主要就是在一個stage結束計算之後,為了下一個stage可以執行shuffle類的算子(比如reduceByKey,groupByKey),而將每個task處理的數據按key進行分區。所謂 “分區”,就是對相同的key執行hash算法,從而將相同key都寫入同一個磁盤文件中,而每一個磁盤文件都只屬於reduce端的stage的一個task。在將數據寫入磁盤之前,會先將數據寫入內存緩衝中,當內存緩衝填滿之後,才會溢寫到磁盤文件中去。

那麼每個執行 Shuffle Write 的 Task,要為下一個 Stage 創建多少個磁盤文件呢? 很簡單,下一個stage的task有多少個,當前stage的每個task就要創建多少份磁盤文件。比如下一個stage總共有100個task,那麼當前stage的每個task都要創建100份磁盤文件。如果當前stage有50個task,總共有10個Executor,每個Executor執行5個Task,那麼每個Executor上總共就要創建500個磁盤文件,所有Executor上會創建5000個磁盤文件。由此可見,未經優化的shuffle write操作所產生的磁盤文件的數量是極其驚人的。

Shuffle Read 階段:

Shuffle Read,通常就是一個stage剛開始時要做的事情。此時該stage的每一個task就需要將上一個stage的計算結果中的所有相同key,從各個節點上通過網絡都拉取到自己所在的節點上,然後進行key的聚合或連接等操作。由於shuffle write的過程中,task給Reduce端的stage的每個task都創建了一個磁盤文件,因此shuffle read的過程中,每個task只要從上游stage的所有task所在節點上,拉取屬於自己的那一個磁盤文件即可。

Shuffle Read的拉取過程是一邊拉取一邊進行聚合的。每個shuffle read task都會有一個自己的buffer緩衝,每次都只能拉取與buffer緩衝相同大小的數據,然後通過內存中的一個Map進行聚合等操作。聚合完一批數據後,再拉取下一批數據,並放到buffer緩衝中進行聚合操作。以此類推,直到最後將所有數據到拉取完,並得到最終的結果。

注意

  1. buffer起到的是緩存作用,緩存能夠加速寫磁盤,提高計算的效率,buffer的默認大小32k。
  2. 分區器:根據hash/numRedcue取模決定數據由幾個Reduce處理,也決定了寫入幾個buffer中
  3. block file:磁盤小文件,從圖中我們可以知道磁盤小文件的個數計算公式:block file=M*R 。 M為map task的數量,R為Reduce的數量,一般Reduce的數量等於buffer的數量,都是由分區器決定的

Hash shuffle普通機制的問題

  1. Shuffle階段在磁盤上會產生海量的小文件,建立通信和拉取數據的次數變多,此時會產生大量耗時低效的 IO 操作 (因為產生過多的小文件)
  2. 可能導致 OOM,大量耗時低效的 IO 操作 ,導致寫磁盤時的對象過多,讀磁盤時候的對象也過多,這些對象存儲在堆內存中,會導致堆內存不足,相應會導致頻繁的GC,GC會導致OOM。由於內存中需要保存海量文件操作句柄和臨時信息,如果數據處理的規模比較龐大的話,內存不可承受,會出現 OOM 等問題

1.3.4 合併機制的Hash shuffle

合併機制就是複用buffer緩衝區,開啟合併機制的配置是spark.shuffle.consolidateFiles。該參數默認值為false,將其設置為true即可開啟優化機制。通常來說,如果我們使用HashShuffleManager,那麼都建議開啟這個選項。


Spark的Shuffle總結分析


這裡有6個這裡有6個shuffleMapTask,數據類別還是分成3種類型,因為Hash算法會根據你的 Key 進行分類,在同一個進程中,無論是有多少過Task,都會把同樣的Key放在同一個Buffer裡,然後把Buffer中的數據寫入以Core數量為單位的本地文件中,(一個Core只有一種類型的Key的數據),每1個Task所在的進程中,分別寫入共同進程中的3份本地文件,這裡有6個shuffleMapTasks,所以總共輸出是 2個Cores x 3個分類文件 = 6個本地小文件。

此時block file = Core * R ,Core為CPU的核數,R為Reduce的數量,但是如果 Reducer 端的並行任務或者是數據分片過多的話則 Core * Reducer Task 依舊過大,也會產生很多小文件。

1.4 Sort shuffle

SortShuffleManager的運行機制也是主要分成兩種,普通運行機制bypass運行機制

1.4.1 Sort shuffle 的普通機制


Spark的Shuffle總結分析


在該模式下,數據會先寫入一個數據結構,聚合算子寫入 Map,一邊通過 Map 局部聚合,一遍寫入內存。Join 算子寫入 ArrayList 直接寫入 內存 中。然後需要判斷是否達到 閾值(5M),如果達到就會將內存數據結構的數據寫入到磁盤,清空內存數據結構。

在溢寫磁盤前,先根據 key 進行 排序,排序 過後的數據,會分批寫入到磁盤文件中。默認批次為10000條,數據會以每批一萬條寫入到磁盤文件。寫入磁盤文件通過緩衝區溢寫的方式,每次溢寫都會產生一個磁盤文件,也就是說一個task過程會產生多個臨時文件。

最後在每個task中,將所有的臨時文件合併,這就是 merge 過程,此過程將所有臨時文件讀取出來,一次寫入到最終文件。意味著一個task的所有數據都在這一個文件中。同時單獨寫一份索引文件,標識下游各個task的數據在文件中的索引start offset和end offset(比如對於wordCount,下標從哪裡(start offset)到哪裡(end offset)是這個單詞)。

這個機制的好處:

  1. 小文件明顯變少了,一個task只生成一個file文件
  2. file文件整體有序,加上索引文件的輔助,查找變快,雖然排序浪費一些性能,但是查找變快很多

1.4.2 bypass模式的sortShuffle

bypass機制運行條件是shuffle map task數量小於spark.shuffle.sort.bypassMergeThreshold參數(默認值200)的值,且不是聚合類的shuffle算子(比如reduceByKey)


Spark的Shuffle總結分析


在 shuffleMapTask 數量 小於默認值200 時,啟用bypass模式的 sortShuffle,並沒有進行sort,原因是數據量本身比較少,沒必要進行sort全排序,因為數據量少本身查詢速度就快,正好省了sort的那部分性能開銷。

1.5 使用到的參數

1.5.1 spark.shuffle.file.buffer

buffer大小默認是32K,為了減少磁盤溢寫的次數,可以適當調整這個數值的大小。降低磁盤IO

1.5.2 spark.reducer.MaxSizeFlight

ReduceTask 拉取數據量的大小,默認48M

1.5.3 spark.shuffle.memoryFraction

shuffle聚合內存的比例,佔用executor內存比例的大小

1.5.4 spark.shuffle.io.maxRetries

拉取數據重試次數,防止網絡抖動帶來的影響

1.5.5 spark.shuffle.io.retryWait

調整到重試間隔時間,拉取失敗後多久才重新進行拉取

1.5.6 spark.shuffle.consolidateFiles

針對 HashShuffle 合併機制

1.5.7 spark.shuffle.sort.bypassMergeThreshold

SortShuffle bypass機制,默認200次

1.5.8 spark.sql.shuffle.partitions

默認200,shuffle時所使用到的分區數,也就是你生成的 part-00000,part-00001···最多也就只能 part-00199 了


分享到:


相關文章: