Spark Shuffle 过程

spark1.2 前 default Hash Shuffle.

spark1.2 后 default Sort Shuffle.

Tungsten Sort spark1.4 可选 (暂时不做介绍,以后会单独说)。

Sort Shuffle 和Hash Shuffle比较

  • Hash Shuffle

Hash Shuffle 每个maper为每个reducer输出一个单独的文件,如M个mapper,R个reducer,所以mapper生成M*R个文件,缺点很明显。

当然可以设置spark.shuffle.consolidateFiles” (default is “false”)为true。mapper的输出文件会被合并。比如设置 num-executors 为 E,spark.executor.cores为C,spark.Task.cpus为T,然后并行任务数 E*C/T, shuffle期间mapper产生的文件有E*C/T*R。大大减少文件数,提高性能。

spark.shuffle.consolidateFiles =false的执行图

Spark Shuffle 过程

spark.shuffle.consolidateFiles =true的执行图

Spark Shuffle 过程

  • Sort Shuffle

Sort Shuffle mapper结果输出到一个单独的文件中根据reducer的id排序并作为索引,这样很容易通过reducer id检索reducer相关联的数据块。每个reduceer对应数据分布子各个文件中。会在做reduce前进行实时合并。

Sort Shuffle 在map时会对数据排序,但是在reduce时不合并排序的结果, 如果数据需要排序,只能在reduce重新排序。

排序比较耗时,所以有时候sort Shuffle 的性能不一定比Hash Shuffle要好。

还有一点要说明,当reducer数量小于“spark.shuffle.sort.bypassMergeThreshold” (200 by default) 值时,就会使用Hash Shuffle。

Spark Shuffle 过程

另外:ShuffledRDD的作用

ShuffledRDD 中的 compute() 只负责将属于每个 partition 的数据 fetch 过来,之后使用 mapPartitions() 操作进行 aggregate,生成 MapPartitionsRDD。


分享到:


相關文章: