Spark 2.x優化:在筆記本上一秒join10億條數據

原文地址:深入介紹新的Tungsten執行引擎

https://databricks.com/blog/2016/05/23/apache-spark-as-a-compiler-joining-a-billion-rows-per-second-on-a-laptop.html

Spark 2.x優化:在筆記本上一秒join10億條數據

背景

Apache Spark已經非常快了,但是我們能不能讓它再快10倍?

這個問題驅使我們重新考慮Spark的物理執行引擎層的構造。當我們深入任何一個現代的數據引擎(比如Spark和其它MPP數據庫),就會發現大量的CPU週期都耗費在不必要的工作上,比如虛擬函數的調用,寫中間數據到CPU緩存或者內存。

通過降低耗費在非必要工作上的CPU週期來提高性能,將是現代編譯器長期關注的事。

Apache Spark 2.0中附帶了第二代Tungsten engine。這一代引擎是建立在現代編譯器和MPP數據庫的想法上,並且把它們應用於數據的處理過程中。主要想法是通過在運行期間優化那些拖慢整個查詢的字節碼到一個單獨的函數中,消除虛擬函數的調用以及利用CPU寄存器來存放那些中間數據。作為這種流線型策略的結果,我們顯著提高CPU效率並且獲得了性能提升,我們把這些技術統稱為"整段代碼生成"(whole-stage code generation)。

過去:火山迭代模型(Volcano Iterator Model)

在深入介紹whole-stage code generation之前,讓我們先回顧一下現在的Spark(以及大多數數據庫系統)是如何運行的。讓我們看看這個簡單的查詢:掃描一個表,然後計算出滿足給定條件屬性值的總行數。

Spark 2.x優化:在筆記本上一秒join10億條數據

為了計算這個查詢,舊版本的Spark(1.x)會利用基於迭代模型的經典查詢評估策略(通常被稱為Volcano model)。在這個模型中,一個查詢由多個算子(operators)組成,每個算子都提供了next()接口,該接口每次只返回一個元組(tuple)給嵌套樹中的下一個算子。比如上面查詢中的Filter算子大致可以翻譯成下面的代碼:

Spark 2.x優化:在筆記本上一秒join10億條數據

每個算子都實現了迭代器接口,這樣就允許查詢執行引擎來優雅地組合任意的算子,而不必擔心每個算子提供的數據類型。所以Volcano模型在過去的二十年間成為了數據庫系統的標準,這個也是Spark使用的架構。

Volcano 對比 手寫代碼

如果我們給一個大學新生十分鐘的時間使用Java來實現上面的查詢。他很可能會想出一段迭代代碼來循環遍歷輸入,判斷條件並計算行數,如下所示:

Spark 2.x優化:在筆記本上一秒join10億條數據

上面編寫的代碼僅僅是專門解決一個給定的查詢,而且很明顯不能實現組合的功能。但是這兩種實現(Volcano與手寫代碼)方式在性能上有啥重要區別呢?一方面Spark和大多數關係型數據庫選擇這種可以對不同算子進行組合的結構;另一方面,我們有一個由新手在十分鐘編寫的程序。我們運行了一個簡單的基準測試,對比了"大學新生"版的程序和Spark版的程序在使用單個線程的情況下運行上面同一份查詢,並且這些數據存儲在磁盤上,格式為Parquet。下面是它們之間的對比:

Spark 2.x優化:在筆記本上一秒join10億條數據

正如你所看到的,大學新生手寫版本的程序要比Volcano模式的程序要快一個數量級!事實證明,6行的Java代碼是被優化過的,其原因如下:

1、沒有虛函數調用:在Volcano模型中,處理一個元組(tuple)最少需要調用一次next()函數。這些函數的調用是由編譯器通過虛函數調度實現的(通過vtable);而手寫版本的代碼沒有一個函數調用。雖然虛函數調度是現代計算機體系結構中重點優化部分,它仍然需要消耗很多CPU指令而且相當的慢,特別是調度數十億次。

2、內存和CPU寄存器中的臨時數據:在Volcano模型中,每次一個算子給另外一個算子傳遞元組的時候,都需要將這個元組存放在內存中;而在手寫版本的代碼中,編譯器(這個例子中是JVM JIT)實際上將臨時數據存放在CPU寄存器中。訪問內存中的數據所需要的CPU時間比直接訪問在寄存器中的數據要大一個數量級!

3、循環展開(Loop unrolling)和SIMD:當運行簡單的循環時,現代編譯器和CPU是令人難以置信的高效。編譯器會自動展開簡單的循環,甚至在每個CPU指令中產生SIMD指令來處理多個元組。CPU的特性,比如管道(pipelining)、預取(prefetching)以及指令重排序(instruction reordering)使得運行簡單的循環非常地高效。然而這些編譯器和CPU對複雜函數調用圖的優化極少,而這些函數正是Volcano模型依賴的。

這裡面的關鍵點是手寫版本代碼的編寫是正對上面的查詢,所以它充分利用到已知的所有信息,導致消除了虛函數的調用,將臨時數據存放在CPU寄存器中,並且可以通過底層硬件進行優化。

未來: Whole-stage Code Generation

通過上面的觀察,很自然的想到下一步的任務是開發運行時自動生成"手寫代碼"的功能,我們稱之為Whole-stage Code Generation。這個想法是受Thomas Neumann發表在VLDB 2011上的論文Efficiently Compiling Efficient Query Plans for Modern Hardware所啟發。

目標是能夠生成整個階段(whole-stage)的代碼,使得Spark計算引擎可以實現手寫代碼的性能,並且提供通用的功能。而不是在運行時依賴算子來處理數據,這些算子在運行時生成代碼,如果可能的話將所有的查詢片段組成到單個函數中,後面我們僅需要運行生成的代碼。

比如對於上面的查詢可以作為單個階段,Spark可以產生以下的JVM字節碼(這裡展示的是Java代碼)。複雜的查詢將會產生多個階段,這種情況下Spark將會產生多個不同的函數。

Spark 2.x優化:在筆記本上一秒join10億條數據

下面表達式中的explain()函數為整段代碼生成進行了擴展。在輸出的結果裡,當算子前面有一個*,說明整段代碼生成被啟用。在下面情況下,Range、Filter和兩個Aggregates算子都啟用了整段代碼生成。然而Exchange算子並沒有實現整段代碼生成,因為它需要通過網絡發送數據。

Spark 2.x優化:在筆記本上一秒join10億條數據

一直關注Spark開發的人可能會問: 我在Apache Spark 1.1中就聽說過代碼生成(code generation),它和這裡講的有啥區別?在過去和其他MPP查詢引擎一樣,Spark僅僅將代碼生成應用於表達式求值(expression evaluation),而且僅限於很小一部分的算子(比如Project, Filter)。也就是說,之前的代碼生成技術僅僅是加快了表達式的求值(比如1+a);然而今天的整段代碼生成技術實際上為整個查詢計劃生成代碼。

向量化(Vectorization)

Whole-stage code-generation技術對那些在大型數據集根據條件過濾的大規模簡單查詢非常有效,但還是存在那些無法生成代碼將整個查詢融合到一個函數的情況。有些算子可能非常地複雜(比如CSV解析或者Parquet解碼),或者有些情況下我們會整合第三方組件,而這些組件的代碼是無法集成到我們生成的代碼之中。

為了提高這些情況下的性能,我們提出一個新的方法叫做向量化(vectorization)。核心思想是:我們不是一次只處理一行數據,而是將許多行的數據分別組成batches,而且採用列式格式存儲;然後每個算子對每個batch進行簡單的循環來遍歷其中的數據。所以每次調用next()函數都會返回一批的元組,這樣可以分攤虛函數調用的開銷。採用了這些措施之後,這些簡單的循環也會使得編譯器和CPU運行的更加高效。

假設有一個表有三列 (id, name, score),下面展示了面向行格式和麵向列格式的內存佈局:

Spark 2.x優化:在筆記本上一秒join10億條數據

這種風格的處理可以實現上面提到三點中的兩點(也兩點分別是:幾乎沒有虛函數調用以及自動展開循環/SIMD),這種風格仍然需要將臨時數據存放在內存中而不是存放在CPU的寄存器中。所以我們只有在無法使用Whole-stage code-generation技術的情況下才會使用vectorization技術。

比如我們已經實現了一個新的矢量Parquet解碼器(vectorized Parquet reader)可以批量解碼和解壓。當解碼存在磁盤上的integer類型的列,新的解碼器大概是舊的解碼器的9倍!

Spark 2.x優化:在筆記本上一秒join10億條數據

性能基準測試

為了有個直觀的感受,我們記錄下在Spark 1.6和Spark 2.0中在一個核上處理一行的操作時間(單位是納秒),下面的表格能夠體現出新的Tungsten engine的威力。Spark 1.6使用的表達式代碼生成技術同樣在今天的其他商業數據庫中採用。

Spark 2.x優化:在筆記本上一秒join10億條數據

我們已經調查了客戶的workloads,並且對那些使用非常頻繁的算子實現了whole-stage code generation 技術,這些算子包括:filter, aggregate以及hash join。正如你在上表看到的,許多核心的算子在採用whole-stage code generation 技術之後的性能已經提高了一個數量級!然而有些算子(比如sort-merge join)天生就很慢,所以非常難對其進行優化。

你可以在這裡看到whole-stage code generation技術的威力,在那裡我們在一臺機器上對10億條記錄進行aggregations和joins操作。不管是在Databricks platform(Intel Haswell處理器,3核)還是在一臺2013年出售的Macbook Pro(Intel Haswell i7)電腦上,對10億條元組進行hash join操作採用的時間不到1秒!

在端到端查詢這個新引擎是如何工作的?除了whole-stage code generation和vectorization技術,對一般的查詢我們在Catalyst optimizer上也進行了很多其他的工作,比如nullability propagation。我們比較了Spark 1.6和Spark 2.0在使用TPC-DS查詢的基本分析,如下圖:

Spark 2.x優化:在筆記本上一秒join10億條數據

那是不是意味著你把Spark升級Spark 2.0,所以的workload將會變的比之前快10倍呢?其實不是,雖然我們相信新的Tungsten引擎為我們的性能優化實現了最好的數據處理架構;但是我們需要理解的是,不是所有的workload都能享受到同樣的程度。比如可變長度的數據類型(如字符串)對其操作本身就非常昂貴;還有些workloads受其他因素影響,比如I/O吞吐量以及元數據操作等。對於之前那些受CPU效率影響的Workloads將會獲得最大的效率。

總結

本文提到的絕大部分工作已經提交到Apache Spark的代碼中,並且將會在Spark 2.0版本發佈。關於whole-stage code generation技術的JIRA可以到SPARK-12795裡查看;而關於vectorization技術的JIRA可以到SPARK-12992查看。

總結一下,本文主要描述了第二代Tungsten執行引擎。

通過whole-stage code generation技術,這個引擎可以

(1)消除虛函數調用;

(2)將臨時數據從內存中移到CPU寄存器中;

(3)利用現代CPU特性來展開循環並使用SIMD功能。

通過vectorization技術,可以加快那些代碼生成比較複雜的算子運行速度。

對於數據處理中很多核心算子,新的引擎會使它們的運行速度提升一個數量級。

在未來,考慮到執行引擎的效率,我們大部分的優化工作將會轉移到優化I/O效率以及更好的查詢計劃。

更進一步閱讀

  • Watch Webinar: Apache Spark 2.0: Easier, Faster, and Smarter 地址:http://go.databricks.com/apache-spark-2.0-presented-by-databricks-co-founder-reynold-xin
  • Technical Preview of Apache Spark 2.0 Now on Databricks 地址:https://databricks.com/blog/2016/05/11/apache-spark-2-0-technical-preview-easier-faster-and-smarter.html
  • Approximate Algorithms in Apache Spark: HyperLogLog and Quantiles 地址: https://databricks.com/blog/2016/05/19/approximate-algorithms-in-apache-spark-hyperloglog-and-quantiles.html


分享到:


相關文章: