場景描述:面對大量複雜的數據分析需求,提供一套穩定、高效、便捷的企業級查詢分析服務具有重大意義。本次演講介紹了字節跳動基於SparkSQL建設大數據查詢統一服務TQS(Toutiao Query Service)的一些實踐以及在執行計劃調優、數據讀取剪枝、SQL兼容性等方面對SparkSQL引擎的一些優化。
關鍵詞:SparkSQL優化 字節跳動
作者: 來自字節跳動數據平臺查詢分析團隊。
目標和能力
為公司內部提供 Hive 、 Spark - SQL 等 OLAP 查詢引擎服務支持。
- 提供全公司大數據查詢的統一服務入口,支持豐富的API接口,覆蓋Adhoc、ETL等SQL查詢需求
- 支持多引擎的智能路由、參數的動態優化
- Spark-SQL/Hive引擎性能優化
針對SparkSQL,主要做了以下優化:
1. 執行計劃自動調優
•基於AE的 ShuffledHashJoin調整
•Leftjoinbuildleftmap技術
2. 數據讀取剪枝
•Parquetlocalsort
•BloomFilter&BitMap
•Prewhere
3. 一些其它優化
執行計劃調優
- 執行計劃的自動調優:
Spark Adaptive Execution ( Intel®Software),簡稱SparkAE,總體思想是將sparksql生成的1個job中的所有stage單獨執行,為每一個stage單獨創建一個子job,子job執行完後收集該stage相關的統計信息(主要是數據量和記錄數),並依據這些統計信息優化調整下游stage的執行計劃。
目前SparkAE主要支持的功能:
(1)數據傾斜的調整
(2)小task的合併
(3)sortmerge-> broadcase
Spark 有3種join方式:Broadcastjoin、ShuffledHashJoin、SortMergeJoin
普通leftjoin無法build 左表
優化點:
在AE的框架下,根據shuffle數據量大小,自動調整join執行計劃:SortMergeJoin調整為 ShuffledHashJoin•擴展支持left-join時將左表build成HashMap。
省去了大表join小表的情況下對shuffle數據的排序過程、join過程以HashMap完成,實現join提速。
- SortMergeJoin調整為ShuffledHashJoin
- Leftjoin build left sidemap
1、初始化表A的一個匹配記錄的映射表
目標:
對於Left-join的情況,可以對左表進行HashMapbuild。使得小左表leftjoin大右表的情況可以進行ShuffledHashJoin調整
難點:
Left-join語義:左表沒有join成功的key,也需要輸出
原理
在構建左表Map的時候,額外維持一個"是否已匹配"的映射表;在和右表join結束之後,把所有沒有匹配到的key,用null進行join填充。
以 Aleft join B 為例:
2、join過程中,匹配到的key置為1,沒有匹配到的項不變(如key3)
3、join結束後,沒有匹配到的項,生成一個補充結果集R2
4.合併結果集R1和結果集R2,輸出最終生成的join結果R。
優化結果
- 約95%左右的joinSQL有被調整成ShuffledHashJoin/BroadcastJoin
- 被優化的SQL整體速度提升20%~30%
- 整體執行時長縮短
基於Parquet數據讀取剪枝
以parquet格式數據為對象,在數據讀取時進行適當的過濾剪枝,從而減少讀取的數據量,加速查詢速度
優化點:
- LocalSort
- BoomFilter
- BitMap
- Prewhere
基於Parquet數據讀取剪枝:LocalSort
對parquet文件針對某個高頻字段進行排序。從而實現讀數據時RowGroup的過濾
目標:
- 自動選擇排序字段
- 生成文件時自動排序
Parquet文件讀取原理:
(1)每個rowgroup的元信息裡,都會記錄自己包含的各個列的最大值和最小值
(2)讀取時如何這個值不在最大值、最小值範圍內,則跳過RowGroup
生成hive分區文件時,先讀取metastore,獲取它是否需要使用localsort,如果需要,選擇它的高頻列是哪個。
基於Parquet數據讀取剪枝:BloomFilter&BitMap
整體優化結果:
- 命中索引平均性能提升 30%
- 生成時間增加:10%
- 空間開銷增加:5%
如何選取合適的列
Local_sort &BloomFilter & BitMap 如何自動生效
基於Parquet數據讀取剪枝:Prewhere
基於列式存儲各列分別存儲、讀取的特性•針對需要返回多列的SQL,先根據下推條件對RowId進行過濾、選取。再有跳過地讀取其他列,從而減少無關IO和後續計算•謂詞選擇(簡單、計算量小):in,=,<>,isnull,isnotnull
優化結果使得:特定SQL(Project16列,where條件 2列)SQL平均性能提升20%
其他優化
- Hive/SparkLoad分區Move文件優化:
通過調整staging目錄位置,實現在Load過程中mv文件夾,替代逐個mv文件,從而減少與NameNode的交互次數
- Spark生成文件合併
通過最後增加一個repartitionstage合併spark生成文件。
- Vcore
對於CPU使用率低的場景,通過vcore技術使得一個yarn-core可以啟動多個spark-core
- Spark 訪問hivemetastore 特定filter下推:
構造 get_partitions_by_filter實現 cast、substring等條件下推hivemetastore,從而減輕metastore返回數據量
運行期調優
在SQL執行前,通過統一的查詢入口,對其進行基於代價的預估,選擇合適的引擎和參數:
1.SQL分析
- 抽取Hiveexplain邏輯,進行SQL語法正確性檢查
- 對SQL包含的算子、輸入的數據量進行標註
2.自動引擎選擇/自動參數優化
標註結果自動選擇執行引擎:
- 小SQL走SparkServer(省去yarn申請資源耗時)
- 其他默認走Spark-Submit
標註結果選擇不同運行參數:
- Executor個數/內存
- Overhead、堆外內存
調優後使得Adhoc30s以內SQL佔比45%,Spark-Submit內存使用量平均減少20%。
閱讀更多 大數據與機器學習 的文章