Spark 处理大数据的异常解决方案汇总

原文地址: https://www.indix.com/blog/engineering/lessons-from-using-spark-to-process-large-amounts-of-data-part-i/

spark处理数据的常见错误和解决办法。

1. executor(执行器)的堆内存溢出。

错误信息:

– java.lang.OutOfMemoryError: Java heap space on the executors nodes

– java.lang.OutOfMemoryError: GC overhead limit exceeded

– org.apache.spark.shuffle.FetchFailedException

原因和解决办法:

1) 执行处理的partitions 需要的内存超过分配的内存。

增加–executor memory 和 executor memory overhead

2)shuffle是很耗性能的操作,因为shuffle涉及磁盘IO,数据序列化和网络传输。

尽量避免shuffle。shuffle 操作把数据写在大量的partition中,而且不保证数据的均匀分布。shuffle 的块文件大小限制为2G。

3) 很多partition到最后都写超过2G。考虑增加partition 数量,保证每个partition的数据不会超过限制。

4) 如果有数据倾斜问题,可以添加一些hash值到key的后面,可以减少数据倾斜的情况。shuffle完成后,需要增加一些聚合操作,去掉添加的hash值,返回正常的key。参考Spark性能优化指南——高级篇

5) 如果partition数量超过2000,spark会高度压缩数据。如果任务需要大约2000个partition,可以repartition 为2001,存疑

6) 使用RDD时, 尽量避免使用groupByKey 操作。这样的操作会在内存中保存key对应的所有值。数据量大会导致OOM,因为他们不会被分配到磁盘上。使用reduceByKey代替,它会首先做一个map端的聚合,减少传输到reducer的数据。

7) jion 两个DataSet,其中一个非常小时,考虑使用broadcastjoin, broadcast小表。设置spark.sql.autoBroadcastJoinThreshold 的值大于小表的大小,或者强制使用:

left.join(broadcast(right), column)

8) 配置一个executor 对应多个core。既可以维持一定的并行度,又可以减少shuffle。shuffle取文件是executor之间的网络传输。

9) 当GC时间太长时。检测内存泄漏。

10) 为cache 数据分配足够的内存,spark.memory.storageFraction。只在数据会被重复使用时才cache。

11) 选择合适GC策略。G1GC更先进,更高的吞吐量和更低延迟。(不是大神不敢随便动)

2. driver堆内存溢出。

错误信息

java.lang.OutOfMemoryError: Java heap space on the driver node

1) --driver-memory 增加内存值

2) 避免是collect 方法,如果要保存数据,可以保存到文件中。

3) 当app有很多的task 或者 partition 时,更容易引起OOM。每个task都会发送一个mapStatus对象到driver。当发生shuffle操作时,数据在excutor之间重新划分。

4) shuffle stage 中 Spark为了避免重新计算,中间文件会写入磁盘。发往driver的map status 包含文件的位置信息。太多的任务会导致driver需要接受和处理倍增的map,输出reduce阶段需要的数据。

3. 集群磁盘空间不足

错误信息

– java.io.IOException: No space left on device

1) Check the executors for logs like this `UnsafeExternalSorter: Thread 75 spilling sort data of 141.0 MB to disk (90 times so far)`. This is an indication that the storage data is continuously evicted to disk. Exceptions like this occur when data becomes larger than what is configured to be stored on the node

2) 确保spark.memory.fraction 不是太低。默认0.6的堆空间,设置大些,会增减执行内存和存储数据的内存,减少溢出。

3) shuffle 会向磁盘写数据。一般尽量避免数据溢出到磁盘,除非计算的代价高于读写磁盘。

4) 增加节点数。

5) 默认,溢出的数据和shuffle 操作的中间数据,会写到/tmp. 可以通过spark.local.dirs 指定一个更大的目录。

4. App 执行时间过长,或者 无限期的卡住,没有展示处理。

1) 长时间执行的task,大多是数据倾斜造成的。解决方法是re-partition 确保数据均匀分布在tasks中。

2) task 的分发机制,依据最低的位置层级(locality level)选择executor. 尽管如此,被分配到一个Executor的task,也有可能被另外的空闲executor执行。这就导致这个task处理的数据需要通过网络读取,位置层级也会被改变。


分享到:


相關文章: