寫Parquet的同時提高Spark作業性能300%

寫Parquet的同時提高Spark作業性能300%

不久前,我正在運行一個Spark ETL,該ETL從AWS S3提取數據進行了一些轉換和清理,並將轉換後的數據以Parquet格式寫回到AWS S3。 JSON Gzip格式的數據量約為350GB。 我的工作節點總共有48個核心,內存為280 GB。

運行該作業後,我注意到該作業持續進行了超過24小時,並且之間有多次失敗。 該錯誤消息曾經是—執行器在120000ms之後超時。 這項工作正在處理大量數據,並且數據寫入階段停留在某個地方。

看到工作日誌後,我看到工作人員被卡在亂堆的數據中,大量數據被溢出到磁盤上。

2018–06–27 12:21:42,671 INFO [UnsafeExternalSorter] — Thread 168 spilling sort data of 3.1 GB to disk (0 time so far)

尋找根本原因

我取消了工作,但在決定後決定檢查正在生成的查詢計劃。

<code>

dataframe

.explain

()/<code>

這是為這項工作制定的計劃。

== Physical Plan ==
Project [acquire_campaign#60036, … 274 more fields]
+- SortMergeJoin [custom__city#60100], [area_name#33572], LeftOuter
:- Sort [custom__city#60100 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(custom__city#60100, 200)


: +- Project [acquire_campaign#0 AS acquire_campaign#60036, … 274 more fields]
: +- FileScan json [acquire_campaign#0,… 18 more fields] Batched: false, Format: JSON, Location: InMemoryFileIndex[s3a://xxx…, PartitionFilters: [], PushedFilters: [], ReadSchema: struct +- *Sort [area_name#33572 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(area_name#33572, 200)
+- InMemoryTableScan [final_city#33571, area_name#33572]
+- InMemoryRelation [final_city#33571, area_name#33572], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
+- *Scan RedshiftRelation((select final_city, lower(trim(city_name)) area_name …)) [final_city#33571,area_name#33572] ReadSchema: struct

那裡!! 我終於看到光了。

問題似乎是正在發生的聯接Join。 我的原始數據與從Redshift中拉出的一個小表連接在一起,看起來好像都是在對數據進行改組(Exchange哈希分區),最後發生了SortMergeJoin,這非常昂貴,因為排序後的數據在內存中,從而很快填充了內存。 磁盤溢出。 因此前進的道路很明確。 我必須消除導致磁盤溢出的混洗。

解決方案

在閱讀了有關聯接如何工作以及Catalyst優化器如何優化查詢的一些知識之後,我意識到redshift表非常小。 我決定在加入時使用廣播提示。 這是使用廣播提示後的查詢計劃-

== Physical Plan ==
Project [acquire_campaign#26550 … 274 more fields]
+- BroadcastHashJoin [custom__city#26614], [area_name#86], LeftOuter, BuildRight
:- Project [acquire_campaign#0 AS acquire_campaign#26550, … 274 more fields]
: +- FileScan json [acquire_campaign#0,… 18 more fields] Batched: false, Format: JSON, Location: InMemoryFileIndex[s3a://xxx…, PartitionFilters: [], PushedFilters: [], ReadSchema: struct +- BroadcastExchange HashedRelationBroadcastMode(List(input[1, string, true]))
+- InMemoryTableScan [final_city#85, area_name#86]
+- InMemoryRelation [final_city#85, area_name#86], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
+- *Scan RedshiftRelation((select final_city, lower(trim(city_name)) area_name …)) [final_city#85,area_name#86] ReadSchema: struct

看哪 哈希交換完成後,用一個BroadcastExchange替換,如果表的大小很小,則該替換並不重要。 最終,該連接以BroadcastHashJoin的形式進行。 這比SortMergeJoin輕量得多。

最終,在將內核數增加到200之後,我運行了該作業,並看到該作業在2.3小時內完成,磁盤溢出最少,而以前則需要24個小時以上。 當考慮到增加的資源時,將近300%的改善。

尤里卡! 甜美成功的氣味令人陶醉。 :D

有關更多帖子,請關注我的出版物:-https://medium.com/polar-tropics

(本文翻譯自Shitij Goyal的文章《Improving Spark job performance while writing Parquet by 300%》,參考:https://medium.com/polar-tropics/improving-spark-job-performance-while-writing-parquet-by-300-40ccf487a6a5)


分享到:


相關文章: