Spark 處理大數據的異常解決方案彙總

原文地址: https://www.indix.com/blog/engineering/lessons-from-using-spark-to-process-large-amounts-of-data-part-i/

spark處理數據的常見錯誤和解決辦法。

1. executor(執行器)的堆內存溢出。

錯誤信息:

– java.lang.OutOfMemoryError: Java heap space on the executors nodes

– java.lang.OutOfMemoryError: GC overhead limit exceeded

– org.apache.spark.shuffle.FetchFailedException

原因和解決辦法:

1) 執行處理的partitions 需要的內存超過分配的內存。

增加–executor memory 和 executor memory overhead

2)shuffle是很耗性能的操作,因為shuffle涉及磁盤IO,數據序列化和網絡傳輸。

儘量避免shuffle。shuffle 操作把數據寫在大量的partition中,而且不保證數據的均勻分佈。shuffle 的塊文件大小限制為2G。

3) 很多partition到最後都寫超過2G。考慮增加partition 數量,保證每個partition的數據不會超過限制。

4) 如果有數據傾斜問題,可以添加一些hash值到key的後面,可以減少數據傾斜的情況。shuffle完成後,需要增加一些聚合操作,去掉添加的hash值,返回正常的key。參考Spark性能優化指南——高級篇

5) 如果partition數量超過2000,spark會高度壓縮數據。如果任務需要大約2000個partition,可以repartition 為2001,存疑

6) 使用RDD時, 儘量避免使用groupByKey 操作。這樣的操作會在內存中保存key對應的所有值。數據量大會導致OOM,因為他們不會被分配到磁盤上。使用reduceByKey代替,它會首先做一個map端的聚合,減少傳輸到reducer的數據。

7) jion 兩個DataSet,其中一個非常小時,考慮使用broadcastjoin, broadcast小表。設置spark.sql.autoBroadcastJoinThreshold 的值大於小表的大小,或者強制使用:

left.join(broadcast(right), column)

8) 配置一個executor 對應多個core。既可以維持一定的並行度,又可以減少shuffle。shuffle取文件是executor之間的網絡傳輸。

9) 當GC時間太長時。檢測內存洩漏。

10) 為cache 數據分配足夠的內存,spark.memory.storageFraction。只在數據會被重複使用時才cache。

11) 選擇合適GC策略。G1GC更先進,更高的吞吐量和更低延遲。(不是大神不敢隨便動)

2. driver堆內存溢出。

錯誤信息

java.lang.OutOfMemoryError: Java heap space on the driver node

1) --driver-memory 增加內存值

2) 避免是collect 方法,如果要保存數據,可以保存到文件中。

3) 當app有很多的task 或者 partition 時,更容易引起OOM。每個task都會發送一個mapStatus對象到driver。當發生shuffle操作時,數據在excutor之間重新劃分。

4) shuffle stage 中 Spark為了避免重新計算,中間文件會寫入磁盤。發往driver的map status 包含文件的位置信息。太多的任務會導致driver需要接受和處理倍增的map,輸出reduce階段需要的數據。

3. 集群磁盤空間不足

錯誤信息

– java.io.IOException: No space left on device

1) Check the executors for logs like this `UnsafeExternalSorter: Thread 75 spilling sort data of 141.0 MB to disk (90 times so far)`. This is an indication that the storage data is continuously evicted to disk. Exceptions like this occur when data becomes larger than what is configured to be stored on the node

2) 確保spark.memory.fraction 不是太低。默認0.6的堆空間,設置大些,會增減執行內存和存儲數據的內存,減少溢出。

3) shuffle 會向磁盤寫數據。一般儘量避免數據溢出到磁盤,除非計算的代價高於讀寫磁盤。

4) 增加節點數。

5) 默認,溢出的數據和shuffle 操作的中間數據,會寫到/tmp. 可以通過spark.local.dirs 指定一個更大的目錄。

4. App 執行時間過長,或者 無限期的卡住,沒有展示處理。

1) 長時間執行的task,大多是數據傾斜造成的。解決方法是re-partition 確保數據均勻分佈在tasks中。

2) task 的分發機制,依據最低的位置層級(locality level)選擇executor. 儘管如此,被分配到一個Executor的task,也有可能被另外的空閒executor執行。這就導致這個task處理的數據需要通過網絡讀取,位置層級也會被改變。


分享到:


相關文章: