Spark Shuffle 過程

spark1.2 前 default Hash Shuffle.

spark1.2 後 default Sort Shuffle.

Tungsten Sort spark1.4 可選 (暫時不做介紹,以後會單獨說)。

Sort Shuffle 和Hash Shuffle比較

  • Hash Shuffle

Hash Shuffle 每個maper為每個reducer輸出一個單獨的文件,如M個mapper,R個reducer,所以mapper生成M*R個文件,缺點很明顯。

當然可以設置spark.shuffle.consolidateFiles” (default is “false”)為true。mapper的輸出文件會被合併。比如設置 num-executors 為 E,spark.executor.cores為C,spark.Task.cpus為T,然後並行任務數 E*C/T, shuffle期間mapper產生的文件有E*C/T*R。大大減少文件數,提高性能。

spark.shuffle.consolidateFiles =false的執行圖

Spark Shuffle 過程

spark.shuffle.consolidateFiles =true的執行圖

Spark Shuffle 過程

  • Sort Shuffle

Sort Shuffle mapper結果輸出到一個單獨的文件中根據reducer的id排序並作為索引,這樣很容易通過reducer id檢索reducer相關聯的數據塊。每個reduceer對應數據分佈子各個文件中。會在做reduce前進行實時合併。

Sort Shuffle 在map時會對數據排序,但是在reduce時不合並排序的結果, 如果數據需要排序,只能在reduce重新排序。

排序比較耗時,所以有時候sort Shuffle 的性能不一定比Hash Shuffle要好。

還有一點要說明,當reducer數量小於“spark.shuffle.sort.bypassMergeThreshold” (200 by default) 值時,就會使用Hash Shuffle。

Spark Shuffle 過程

另外:ShuffledRDD的作用

ShuffledRDD 中的 compute() 只負責將屬於每個 partition 的數據 fetch 過來,之後使用 mapPartitions() 操作進行 aggregate,生成 MapPartitionsRDD。


分享到:


相關文章: