Spark-關於Shuffle

一、關於Shuffle

**Shuffle**

,洗牌,。之所以需要Shuffle,還是因為具有某種共同特徵的一類數據需要最終匯聚(aggregate)到一個計算節點上進行計算。這些數據分佈在各個存儲節點上並且由不同節點的計算單元處理。以最簡單的WordCount 為例,其中數據保存在Node1、Node2和Node3;經過處理後,這些數據最終會匯聚到Node a、Node b處理,。 這個數據重新打亂,然後匯聚到不同節點的過程就是Shuffle,。

二、關於Shuffle調優

### 關於Spark Shuffle 概述

Spark是分佈式計算系統,數據塊在不同節點執行,但是一些操作,例如join,需要將不同節點上相同的Key對應的Value聚集到一起,Shuffle便應運而生。

Shuffle 是很耗資源的操作: **網絡IO****磁盤IO**(因為spark中Shuffle是一定落磁盤的)。Spark默認使用32KB的memory buffer存儲Shuffle的中間結果,如果buffer滿了就會寫入磁盤,生成一個小文件,每個Partition的多個小文件會在map端處理結束後合併為一個大文件,這些臨時文件會在對應的application結束後被刪除。

#### MapReduce

MapReduce的過程,大致如下

- read、compute and buffer in memory ,map task讀取數據,計算並將數據存儲在內存。

- sort、spill to disk and merge 內存不足,數據溢寫到磁盤,溢寫的數據是已經sort的,最後對溢寫的磁盤數據再進行一次merge sort。

- reduce fetch data and merge ,reduce task讀取map的輸出,並進行歸併排序。

- compute and output 對歸併後的數據進行計算並輸出。

#### 觸發Shuffle操作

以下三類:

- **repartition相關** repartition、coalesce

- **ByKey操作** groupByKey、reduceByKey、combineByKey、aggregateByKey等

- **join相關** cogroup、join

#### 實現

Spark中先後實現了三個Shuffle算法,如下

// Let the user specify short names for shuffle managers

// 讓用戶給shuffle managers指定一個短名稱

/** 這裡支持了兩種類型的shuffle

* 以前還有一種"hash"->"org.apache.spark.shuffle.hash.HashShuffleManager"

* 但是HashShuffleManager被取消使用了

* */

val shortShuffleMgrNames = Map(

"sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,

"tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)

hash shuffle(被取消了)、sort shuffle、tungsten-sort shuffle的實現細節

### 1、什麼是shuffle

每個Spark作業啟動運行的時候,首先Driver進程會將我們編寫的Spark作業代碼分拆為多個stage,每個stage執行一部分代碼片段,併為每個stage創建一批Task,然後將這些Task分配到各個Executor進程中執行。一個stage的所有Task都執行完畢之後,在各個executor節點上會產生大量的文件,這些文件會通過IO寫入磁盤(這些文件存放的時候這個stage計算得到的中間結果),然後Driver就會調度運行下一個stage。下一個stage的Task的輸入數據就是上一個stage輸出的中間結果。如此循環往復,直到程序執行完畢,最終得到我們想要的結果。Spark是根據shuffle類算子來進行stage的劃分。如果我們的代碼中執行了某個shuffle類算子(比如groupByKey、countByKey、reduceByKey、join等等)每當遇到這種類型的RDD算子的時候,劃分出一個stage界限來。

每個shuffle的前半部分stage的每個task都會創建出後半部分stage對應的task數量的文件,(注意是前半部分的每個task都會創建相同數量的文件)。shuffle的後半部分stage的task拉取前半部分stage中task產生的文件(這裡拉取的文件是:屬於自己task計算的那部分文件);然後每個task會有一個內存緩衝區,使用HashMap對值進行彙集;比如,task會對我們自己定義的聚合函數,如reduceByKey()算子,把所有的值進行累加,聚合出來得到最終的值,就完成了shuffle操作。

那麼默認的這種shuffle操作對性能有什麼影響嗎?

舉個例子;有100個節點,每個節點運行一個executor,每個executor有2個cpu core,總共有1000個task;那麼每個executor平均10個task。那麼每個節點將會輸出map端文件為:10 * 1000 = 10000;整個map端輸出的文件數:100 * 10000 = 100萬;shuffle中寫磁盤操作是最消耗性能的。那麼有什麼辦法可以降低文件個數的產生呢?先來看看下面這個圖

為了解決產生大量文件的問題,我們可以**在map端輸出的位置,將文件進行合併操作**,即使用

spark.shuffle.consolidateFiles 參數來合併文件,具體的使用方式為

new SparkConf().set("spark.shuffle.consolidateFiles","true")

開啟文件合併以後,我們map端輸出的文件會變為20萬左右,也就是說map端輸出的文件是原來默認的五分之一。所以說通過這個參數的設置,可以大大提升我們Spark作業的運行速度

### 關於map端內存緩衝和reduce端內存佔比的優化。

**什麼是map端內存緩衝區呢?**默認情況下,每個map端的task 輸出的一些中間結果在寫入磁盤之前,會先被寫入到一個臨時的內存緩衝區,這個緩衝區的默認大小為32kb,當內存緩衝區滿溢之後,才會將產生的中間結果spill到磁盤上。

**reduce端內存佔比又是什麼呢?**reduce端的task在拉取到數據之後,會用一個hashmap的數據結構對各個key對應的value進行匯聚操作。在進行匯聚操作的時候,其使用的內存是由executor進程給分配的,默認將executor的內存的20%分配給reduce task 進行聚合操作使用。這裡會有一個問題,當reduce task拉取的數據很多導致其分配的內存放不下的時候,這個時候會將放不下的數據全部spill到磁盤上去。

為了解決map端數據滿溢引發的spill和reduce端數據過大引發的spill操作。我們可以通過兩個參數來適當調整,以避免上述情況的出現,這個兩個參數分別是:

spark.shuffle.file.buffer #map task的內存緩衝調節參數,默認是32kb

spark.shuffle.memoryFraction #reduce端聚合內存佔比,默認0.2

### 怎麼判斷在什麼時候對這兩個參數進行調整呢?

通過監控平臺查看每個executor的task的shuffle write和shuffle read的運行次數,如果發現這個指標的運行次數比較多,那麼就應該考慮這兩個參數的調整了;這個參數調整有一個前提,spark.shuffle.file.buffer參數每次擴大一倍的方式進行調整,spark.shuffle.memoryFraction參數每次增加0.1進行調整。


分享到:


相關文章: