Spark 處理大數據時,數據傾斜的調優方案

數據傾斜是Spark作業運行時的一個最棘手的問題。數據傾斜調優,就是使用各種技術方案解決不同類型的數據傾斜問題,以保證Spark作業的性能。

Spark 處理大數據時,數據傾斜的調優方案

數據傾斜發生時的現象

  • 絕大多數task執行得都非常快,但個別task執行極慢。比如,總共有1000個task,997個task都在1分鐘之內執行完了,但是剩餘兩三個task卻要一兩個小時。這種情況很常見。
  • 原本能夠正常執行的Spark作業,某天突然報出OOM(內存溢出)異常,觀察異常棧,是我們寫的業務代碼造成的。這種情況比較少見。

數據傾斜發生的原理

數據傾斜只會在需要進行shuffle的操作中。Spark進行shuffle時,需要將相同key值的數據,拉取到同一個節點的其中一個任務來處理,比如:groupBy,Join等操作。

如果某個key或一些key的數據量特別大,就發生了數據傾斜問題。相同key的數據會在同一個task中處理,而task運行資源有限(受限於executor進程和網絡傳輸等),會使得這個任務運行非常緩慢,甚至導致內存溢出、網絡傳輸文件過大等異常。

這裡給大家羅列一些常用的並且可能會觸發shuffle操作的算子:distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition等。出現數據傾斜時,可能就是你的代碼中使用了這些算子中的某一個所導致的。

定位數據傾斜代碼

  1. 通過sparkui 查看是哪個stage運行慢,運行慢的stage中task處理的數據有沒有偏差很大。
  2. 通過理解代碼或者sql的DAG圖,和運行時的數據兩來判斷是哪一步發生數據傾斜。
  3. 找出數據傾斜的key值,rdd就用reduceByKey 方法統計key值的數據兩,sql就直接執行group by 就可以統計。在只統計個數的情況下,一般不會因為數據傾斜而導致運行異常,都是可以統計出來的。

解決方案

1 提高shuffle操作的並行度。

增加shuffle read task的數量,可以讓原本分配給一個task的多個key分配給多個task,從而讓每個task處理比原來更少的數據。

方案優點:實現起來比較簡單,可以有效緩解和減輕數據傾斜的影響。

方案缺點:

只是緩解了數據傾斜而已,沒有徹底根除問題,根據實踐經驗來看,其效果有限。

2 過濾數據量大的key

如果發現導致傾斜的key就少數幾個,而且對計算本身的影響並不大的話,那麼很適合使用這種方案。比如99%的key就對應10條數據,但是隻有一個key對應了100萬數據,從而導致了數據傾斜。

直接從RDD或者DataFrame中派出這些key就行。

3 兩階段聚合(局部聚合+全局聚合)

在進行對RDD執行reduceByKey等聚合類shuffle算子或者在Spark SQL中使用group by語句進行分組聚合時,比較適用。

方案實現原理:將原本相同的key通過附加隨機前綴的方式,變成多個不同的key,就可以讓原本被一個task處理的數據分散到多個task上去做局部聚合,進而解決單個task處理數據量過多的問題。接著去除掉隨機前綴,再次進行全局聚合,就可以得到最終的結果。具體原理見下圖。

Spark 處理大數據時,數據傾斜的調優方案

4 通過Broadcast廣播方式避免shuffle操作

如果RDD的join,或者sql中join語法,其中一個表比較小,最好小於1G, 可以通過Broadcast避免shuffle操作。RDD中需要創建Broadcast 變量,Spark sql 可以設置spark.sql.autoBroadcastJoinThreshold 的值,當join的表小於這個值時,自動通過廣播方式進行join。

5 傾斜key拆分

兩個RDD/Hive表進行join的時候,如果數據量都比較大,不能廣播。那麼此時可以看一下兩個RDD/Hive表中的key分佈情況。如果出現數據傾斜,是因為其中某一個RDD/Hive表中的少數幾個key的數據量過大,而另一個RDD/Hive表中的所有key都分佈比較均勻,那麼採用這個解決方案是比較合適的。實現思路:

  • 對包含少數幾個數據量過大的key的那個RDD,通過sample算子採樣出一份樣本來,然後統計一下每個key的數量,計算出來數據量最大的是哪幾個key。
  • 然後將這幾個key對應的數據從原來的RDD中拆分出來,形成一個單獨的RDD,並給每個key都打上n以內的隨機數作為前綴,而不會導致傾斜的大部分key形成另外一個RDD。
  • 接著將需要join的另一個RDD,也過濾出來那幾個傾斜key對應的數據並形成一個單獨的RDD,將每條數據膨脹成n條數據,這n條數據都按順序附加一個0~n的前綴,不會導致傾斜的大部分key也形成另外一個RDD。
  • 再將附加了隨機前綴的獨立RDD與另一個膨脹n倍的獨立RDD進行join,此時就可以將原先相同的key打散成n份,分散到多個task中去進行join了。
  • 而另外兩個普通的RDD就照常join即可。
  • 最後將兩次join的結果使用union算子合併起來即可,就是最終的join結果。

這種方式特別消耗內存,因為一份數據有會有n份副本。如果比較大key非常多,一定要慎重。


分享到:


相關文章: