Spark 數據傾斜解決方案

Spark中的數據傾斜問題主要指shuffle過程中出現的數據傾斜問題,是由於不同的key對應的數據量不同導致的不同task所處理的數據量不同的問題。

例如,reduce點一共要處理100萬條數據,第一個和第二個task分別被分配到了1萬條數據,計算5分鐘內完成,第三個task分配到了98萬數據,此時第三個task可能需要10個小時完成,這使得整個Spark作業需要10個小時才能運行完成,這就是數據傾斜所帶來的後果。

注意,要區分開數據傾斜與數據量過量這兩種情況,數據傾斜是指少數task被分配了絕大多數的數據,因此少數task運行緩慢;數據過量是指所有task被分配的數據量都很大,相差不多,所有task都運行緩慢。

數據傾斜的表現:

  1. Spark作業的大部分task都執行迅速,只有有限的幾個task執行的非常慢,此時可能出現了數據傾斜,作業可以運行,但是運行得非常慢;
  2. Spark作業的大部分task都執行迅速,但是有的task在運行過程中會突然報出OOM,反覆執行幾次都在某一個task報出OOM錯誤,此時可能出現了數據傾斜,作業無法正常運行。

定位數據傾斜問題:

  1. 查閱代碼中的shuffle算子,例如reduceByKey、countByKey、groupByKey、join等算子,根據代碼邏輯判斷此處是否會出現數據傾斜;
  2. 查看Spark作業的log文件,log文件對於錯誤的記錄會精確到代碼的某一行,可以根據異常定位到的代碼位置來明確錯誤發生在第幾個stage,對應的shuffle算子是哪一個(通過yarn8088登陸去查看log跳轉到spark日誌)

1 解決方案一:聚合原數據

  1. 避免shuffle過程·

絕大多數情況下,Spark作業的數據來源都是Hive表,這些Hive表基本都是經過ETL之後的昨天的數據。

為了避免數據傾斜,我們可以考慮避免shuffle過程,如果避免了shuffle過程,那麼從根本上就消除了發生數據傾斜問題的可能。

如果Spark作業的數據來源於Hive表,那麼可以先在Hive表中對數據進行聚合,例如按照key進行分組,將同一key對應的所有value用一種特殊的格式拼接到一個字符串裡去,這樣,一個key就只有一條數據了;之後,對一個key的所有value進行處理時,只需要進行map操作即可,無需再進行任何的shuffle操作。通過上述方式就避免了執行shuffle操作,也就不可能會發生任何的數據傾斜問題。

對於Hive表中數據的操作,不一定是拼接成一個字符串,也可以是直接對key的每一條數據進行累計計算。

要區分開,處理的數據量大和數據傾斜的區別

  1. 縮小key粒度(增大數據傾斜可能性,降低每個task的數據量)

key的數量增加,可能使數據傾斜更嚴重。

  1. 增大key粒度(減小數據傾斜可能性,增大每個task的數據量)

如果沒有辦法對每個key聚合出來一條數據,在特定場景下,可以考慮擴大key的聚合粒度。

例如,目前有10萬條用戶數據,當前key的粒度是(省,城市,區,日期),現在我們考慮擴大粒度,將key的粒度擴大為(省,城市,日期),這樣的話,key的數量會減少,key之間的數據量差異也有可能會減少,由此可以減輕數據傾斜的現象和問題。(此方法只針對特定類型的數據有效,當應用場景不適宜時,會加重數據傾斜)

2 解決方案二:過濾導致傾斜的key

如果在Spark作業中允許丟棄某些數據,那麼可以考慮將可能導致數據傾斜的key進行過濾,濾除可能導致數據傾斜的key對應的數據,這樣,在Spark作業中就不會發生數據傾斜了。

比如,數據中的key的值中會有null,且這樣的數據很多,那麼在shuffle時會將key為null的數據放到一個task中執行,那麼這個task肯定會執行的很慢,會傾斜。解決方式是給值為null的key添加隨機值,目的是把這樣的數據打散。

3 解決方案三:提高shuffle操作中的reduce並行度

當方案一和方案二對於數據傾斜的處理沒有很好的效果時,可以考慮提高shuffle過程中的reduce端並行度,reduce端並行度的提高就增加了reduce端task的數量,那麼每個task分配到的數據量就會相應減少,由此緩解數據傾斜問題。

  1. reduce端並行度的設置

在大部分的shuffle算子中,都可以傳入一個並行度的設置參數,比如reduceByKey(500),這個參數會決定shuffle過程中reduce端的並行度,在進行shuffle操作的時候,就會對應著創建指定數量的reduce task。對於Spark SQL中的shuffle類語句,比如group by、join等,需要設置一個參數,即spark.sql.shuffle.partitions,該參數代表了shuffle read task的並行度,該值默認是200,對於很多場景來說都有點過小。

增加shuffle read task的數量,可以讓原本分配給一個task的多個key分配給多個task,從而讓每個task處理比原來更少的數據。舉例來說,如果原本有5個key,每個key對應10條數據,這5個key都是分配給一個task的,那麼這個task就要處理50條數據。而增加了shuffle read task以後,每個task就分配到一個key,即每個task就處理10條數據,那麼自然每個task的執行時間都會變短了。本質是達到一個task處理一個key的數據。

  1. reduce端並行度設置存在的缺陷

提高reduce端並行度並沒有從根本上改變數據傾斜的本質和問題(方案一和方案二從根本上避免了數據傾斜的發生),只是儘可能地去緩解和減輕shuffle reduce task的數據壓力,以及數據傾斜的問題,適用於有較多key對應的數據量都比較大的情況。

該方案通常無法徹底解決數據傾斜,因為如果出現一些極端情況,比如某個key對應的數據量有100萬,那麼無論你的task數量增加到多少,這個對應著100萬數據的key肯定還是會分配到一個task中去處理,因此註定還是會發生數據傾斜的。所以這種方案只能說是在發現數據傾斜時嘗試使用的第一種手段,嘗試去用最簡單的方法緩解數據傾斜而已,或者是和其他方案結合起來使用。

在理想情況下,reduce端並行度提升後,會在一定程度上減輕數據傾斜的問題,甚至基本消除數據傾斜;但是,在一些情況下,只會讓原來由於數據傾斜而運行緩慢的task運行速度稍有提升,或者避免了某些task的OOM問題,但是,仍然運行緩慢,此時,要及時放棄方案三,開始嘗試後面的方案。

4 解決方案四:使用隨機key實現雙重聚合

當使用了類似於groupByKey、reduceByKey這樣的算子時,可以考慮使用隨機key實現雙重聚合,如圖3-1所示:

Spark 數據傾斜解決方案

圖3-1 隨機key實現雙重聚合

首先,通過map算子給每個數據的key添加隨機數前綴,對key進行打散,將原先一樣的key變成不一樣的key,然後進行第一次聚合,這樣就可以讓原本被一個task處理的數據分散到多個task上去做局部聚合;隨後,去除掉每個key的前綴,再次進行聚合。

此方法對於由groupByKey、reduceByKey這類算子造成的數據傾斜由比較好的效果,僅僅適用於聚合類的shuffle操作,適用範圍相對較窄。如果是join類的shuffle操作,還得用其他的解決方案。

此方法也是前幾種方案沒有比較好的效果時要嘗試的解決方案。

5 解決方案五:將reduce join轉換為map join

正常情況下,join操作都會執行shuffle過程,並且執行的是reduce join,也就是先將所有相同的key和對應的value匯聚到一個reduce task中,然後再進行join。普通join的過程如下圖所示:

Spark 數據傾斜解決方案

圖3-2 普通join過程

普通的join是會走shuffle過程的,而一旦shuffle,就相當於會將相同key的數據拉取到一個shuffle read task中再進行join,此時就是reduce join。但是如果一個RDD是比較小的,則可以採用廣播小RDD全量數據+map算子來實現與join同樣的效果,也就是map join,此時就不會發生shuffle操作,也就不會發生數據傾斜。

注意,RDD是並不能進行廣播的,只能將RDD內部的數據通過collect拉取到Driver內存然後再進行廣播

  1. 核心思路:

不使用join算子進行連接操作,而使用Broadcast變量與map類算子實現join操作,進而完全規避掉shuffle類的操作,徹底避免數據傾斜的發生和出現。將較小RDD中的數據直接通過collect算子拉取到Driver端的內存中來,然後對其創建一個Broadcast變量;接著對另外一個RDD執行map類算子,在算子函數內,從Broadcast變量中獲取較小RDD的全量數據,與當前RDD的每一條數據按照連接key進行比對,如果連接key相同的話,那麼就將兩個RDD的數據用你需要的方式連接起來。

根據上述思路,根本不會發生shuffle操作,從根本上杜絕了join操作可能導致的數據傾斜問題。

當join操作有數據傾斜問題並且其中一個RDD的數據量較小時,可以優先考慮這種方式,效果非常好。其實這種方式如同hive中的map join優化,即 /*+ mapjoin(table) */ 寫法,所以思想很多時候都是一樣的。

map join的過程如圖3-3所示:

Spark 數據傾斜解決方案

圖3-3 map join過程

  1. 不適用場景分析:

由於Spark的廣播變量是在每個Executor中保存一個副本,如果兩個RDD數據量都比較大,那麼如果將一個數據量比較大的 RDD做成廣播變量,那麼很有可能會造成內存溢出。

6 解決方案六:sample採樣對傾斜key單獨進行join

在Spark中,如果某個RDD只有一個key,那麼在shuffle過程中會默認將此key對應的數據打散,由不同的reduce端task進行處理。

當由單個key導致數據傾斜時,可有將發生數據傾斜的key單獨提取出來,組成一個RDD,然後用這個原本會導致傾斜的key組成的RDD根其他RDD單獨join,此時,根據Spark的運行機制,此RDD中的數據會在shuffle階段被分散到多個task中去進行join操作。傾斜key單獨join的流程如圖3-4所示:

Spark 數據傾斜解決方案

圖3-4 傾斜key單獨join流程

  1. 適用場景分析:

對於RDD中的數據,可以將其轉換為一箇中間表,或者是直接使用countByKey()的方式,看一個這個RDD中各個key對應的數據量,此時如果你發現整個RDD就一個key的數據量特別多,那麼就可以考慮使用這種方法。

當數據量非常大時,可以考慮使用sample採樣獲取10%的數據,然後分析這10%的數據中哪個key可能會導致數據傾斜,然後將這個key對應的數據單獨提取出來。

  1. 不適用場景分析:

如果一個RDD中導致數據傾斜的key很多,那麼此方案不適用。

7 解決方案七:使用隨機數以及擴容進行join

如果在進行join操作時,RDD中有大量的key導致數據傾斜,那麼進行分拆key也沒什麼意義,此時就只能使用最後一種方案來解決問題了,對於join操作,我們可以考慮對其中一個RDD數據進行擴容,另一個RDD進行稀釋後再join。

我們會將原先一樣的key通過附加隨機前綴變成不一樣的key,然後就可以將這些處理後的“不同key”分散到多個task中去處理,而不是讓一個task處理大量的相同key。這一種方案是針對有大量傾斜key的情況,沒法將部分key拆分出來進行單獨處理,需要對整個RDD進行數據擴容,對內存資源要求很高。

  1. 核心思想:

選擇一個RDD,使用flatMap進行擴容,對每條數據的key添加數值前綴(1~N的數值),將一條數據映射為多條數據;(擴容)

選擇另外一個RDD,進行map映射操作,每條數據的key都打上一個隨機數作為前綴(1~N的隨機數);(稀釋)

將兩個處理後的RDD,進行join操作。

Spark 數據傾斜解決方案

圖3-6 使用隨機數以及擴容進行join

  1. 侷限性:

如果兩個RDD都很大,那麼將RDD進行N倍的擴容顯然行不通;

使用擴容的方式只能緩解數據傾斜,不能徹底解決數據傾斜問題。

  1. 使用方案七對方案六進一步優化分析:

當RDD中有幾個key導致數據傾斜時,方案六不再適用,而方案七又非常消耗資源,此時可以引入方案七的思想完善方案六:

  1. 對包含少數幾個數據量過大的key的那個RDD,通過sample算子採樣出一份樣本來,然後統計一下每個key的數量,計算出來數據量最大的是哪幾個key。
  2. 然後將這幾個key對應的數據從原來的RDD中拆分出來,形成一個單獨的RDD,並給每個key都打上n以內的隨機數作為前綴,而不會導致傾斜的大部分key形成另外一個RDD。
  3. 接著將需要join的另一個RDD,也過濾出來那幾個傾斜key對應的數據並形成一個單獨的RDD,將每條數據膨脹成n條數據,這n條數據都按順序附加一個0~n的前綴,不會導致傾斜的大部分key也形成另外一個RDD。
  4. 再將附加了隨機前綴的獨立RDD與另一個膨脹n倍的獨立RDD進行join,此時就可以將原先相同的key打散成n份,分散到多個task中去進行join了。
  5. 而另外兩個普通的RDD就照常join即可。
  6. 最後將兩次join的結果使用union算子合併起來即可,就是最終的join結果。


分享到:


相關文章: