一文詳解 OceanBase 並行執行引擎實現

OB君:本文整理自OceanBase TechTalk第四期杭州站由螞蟻金服OB團隊技術專家餘璜(花名:曉楚)的演講,本文將帶讀者深入瞭解OceanBase 2.0的並行執行框架。

Tips:您可以關注“OceanBase”公眾號回覆“0512”獲取現場PPT

一文詳解 OceanBase 並行執行引擎實現

背景介紹

OceanBase 在公司內外越來越多的場景落地,需求也開始多樣化。客戶不僅希望 OceanBase 提供優異的 OLTP 能力,還希望無需導出數據到第三方平臺,就地提供 OLAP(Online Analytical Processing)能力。

在 OceanBase 1.4 版本中,我們已經提供了基本的分佈式執行能力,但由於其設計考量的核心應用場景還是給類 OLTP 查詢提供基礎分佈式數據訪問能力,對於大數據量的查詢場景,以及分佈式複雜查詢的支持都比較有限,主要存在以下幾個問題:

  1. 掃描粒度固定:只能以分區為基本單位掃描數據,導致大量分區數場景下需要發起大量 RPC;不支持分區內並行
  2. 調度能力有限:單步調度,無法進一步發掘並行能力
  3. 數據不能流水:中間結果落盤,無法形成數據流水線

為了更好地適應客戶需求,我們在 OceanBase 2.0 版本中引入了全新的並行執行框架,融合了 1.4 版本的分佈式數據訪問能力,同時增強了並行執行的能力,支持對 OceanBase 中的海量數據進行在線分析處理,

使得 OceanBase 向 OLTP + OLAP 融合型數據庫的路上邁出了第一步

並行執行框架

OceanBase 是一個 Share Nothing 的數據庫,數據以分片的形式存儲於每個節點,節點之間通過千兆、萬兆網絡通信。

一文詳解 OceanBase 並行執行引擎實現

OceanBase 分佈式計算架構示意圖

一般會在每個節點上部署一個叫做 observer 的進程,它是 OceanBase 對外服務的主體。OceanBase 會根據一定的均衡策略將數據分片均衡到多個 observer 上,於是,一個並行查詢,一般需要同時訪問多個 observer。

一文詳解 OceanBase 並行執行引擎實現

數據分片示意圖

SQL語句並行執行流程

當用戶給定的 SQL 語句需要訪問的數據位於 2 臺或 2 臺以上 observer 時,就會啟用並行執行,會執行如下步驟:

  1. 用戶所連接的這個 observer 將承擔查詢協調者(Query Coordinator,QC)的角色
  2. QC 預約足夠的線程資源
  3. QC 將需要並行的計劃拆成多個子計劃(下稱 DFO, Data Flow Operation),每個 DFO 包含若干個串行執行的算子。例如,一個 DFO 裡包含了掃描分區,聚集,發送算子,另外一個 DFO 裡包含了收集、聚集算子等。
  4. QC 按照一定的邏輯順序將 DFO 調度到合適的 observer 上執行,observer 上會臨時啟動一個輔助協調者(Sub Query Coordinator,SQC),SQC 負責在所在 observer 上為各個 DFO 申請執行資源、構造執行上下文環境等,然後啟動 DFO 在各個 observer 上並行執行
  5. 當各個 DFO 都執行完畢,QC 會串行執行剩餘部分的計算。如,一個並行的 COUNT 算法,最終需要 QC 將各個機器上的計算結果做一個 SUM 運算。
  6. QC 所在線程將結果返回給客戶端
一文詳解 OceanBase 並行執行引擎實現

優化器負責決策生成一個怎樣的並行計劃,QC 負責具體執行該計劃。例如,兩分區表 JOIN,優化器根據規則和代價信息,可能生成一個分佈式的 PARTITION WISE JOIN 計劃,也可能生成一個 HASH-HASH 打散的分佈式 JOIN 計劃。計劃一旦確定,QC 就會將計劃拆分成多個 DFO,有序調度執行。

並行度與任務劃分方法

我們用並行度 (Degree Of Parallelism, DOP)的概念指定用多少個線程(Worker)來執行一個 DFO。目前 OceanBase 通過 parallel 這個 hint 來指定並行度。確定並行度後,會將 DOP 拆分到需要運行 DFO 的多個 server 上。

對於包含掃描的 DFO,會計算 DFO 需要訪問哪些 partition,這些 partition 分佈在哪些 server 上,然後將 DOP 按比例劃分給這些 server。例如,DOP = 6,DFO 要訪問 120 個 partition,其中 server 1 上有 60 個 partition, server 2 上有 40 個 partition,server 3 上有 20 個 partition,那麼,會給 server 1 上分 3 個線程, server 2 上分 2 個線程,server 3 上分 1 個線程,達到平均每個線程可以處理 20 個 partition 的效果。如果 DOP 和 partition 數不能整除,會做一定的調整,達到長尾儘可能短的目的。

如果每個機器上分得的 worker 數遠大於分區數,會自動做分區內並行。每個分區會以宏塊為邊界切分成若干個掃描任務,由多個 worker 爭搶執行。

為了將這種劃分能力進行抽象和封裝,引入 Granule 的概念。每個掃描任務稱為一個 Granule,這個掃描任務既可以是掃一個 partition,也可以掃 partition 中的一小塊範圍。

一文詳解 OceanBase 並行執行引擎實現

partition 的切分需要把握一個度,既不能且得太粗,也不能太細。太粗,容易出現 worker 工作量不均衡,太細,掃描時反覆從一個 granule 切換到下一個 granule 造成的開銷太大。目前使用了一個經驗值,每個 worker 平均可以拿到 13 個 Granule 是最合適的。多個 Granule 串成一個鏈表,由各個 worker 從鏈表上搶任務執行。

一文詳解 OceanBase 並行執行引擎實現

Worker 爭搶 Granule 的例子

對於不包含掃描任務的 DFO,會分配在 child DFO 所在的機器上,以儘可能減少一些跨機的數據傳輸。

部分 DFO 不能並行執行,會被打上 LOCAL 標記,QC 會在本地調度這樣的 DFO,且強制將其並行度設置為 1。

並行調度方法

優化器生成並行計劃後,QC 會將其切分成多個 DFO。如下圖,是 t1、t2 表做 HASH JOIN,切分成了 3 個 DFO,DFO 1、DFO 2 負責並行掃描數據,並將數據 HASH 到對應節點,DFO 3 負責做 HASH JOIN,並將最終的 HASH 結果彙總到 QC。

一文詳解 OceanBase 並行執行引擎實現

QC 會盡量使用 2 組線程來完成計劃的調度,例如上面的例子中,QC 首先會調度 DFO 1 和 DFO 3,DFO 1 開始執行後就開始掃數據, 並吐給 DFO 3,DFO 3 開始執行後,首先會阻塞在 HASH JOIN 建 hash table 的步驟上,也就是會一直會從 DFO 1 收數據,直到全部收齊,建立好 hash table。然後 DFO 3 會從右邊的 DFO 2 收數據。這時候 DFO 2 還沒有被調度起來,所以 DFO 3 會等待在收數據的流程上。DFO 1 在把數據都發送給 DFO 3 後就可以讓出線程資源退出了。調度器回收了 DFO 1 的線程資源後,立即會調度 DFO 2。 DFO 2 開始運行後就開始給 DFO 3 發送數據,DFO 3 每收到一行 DFO 2 的數據就回去 hash table 中查表,如果命中,就會立即向上輸出給 QC,QC 負責將結果輸出給客戶端。

讀到這裡大家通常會有這麼兩個疑問:

  1. 為什麼是先調度 DFO 1,再調度 DFO 2 呢?從右到左調度,先 DFO 2 再 DFO 1 不行嗎?
  2. 上面只用 2 組線程就完成了調度,是不是巧合? 如果換了 MERGE JOIN 呢?

首先,所有的算子都遵循一個約定俗成的習慣,都是先 open 左支,再 open 右支,處理首行數據的順序也是先左後右。這也就使得調度的順序也要和這種習慣相一致,否則即使調度了,也無法推進。

其次,2 組線程完成調度,這是我們設計出來的,是並行框架設計中的人為限制,我們希望計劃不要佔用過多的線程組就能推進。為了達到這個目的,對於所有的左深樹調度,如果有 DFO 需要同時從左右 DFO 讀數據,那麼我們會在這個 DFO 中插入一些阻塞性算子(如 Sort、Matierial),強行先把左側 DFO 中數據全部收取上來。例如 TPCH Q12 的並行計劃如下,箭頭處的 MERGE JOIN(MJ)需要從左邊、右邊同時收取數據才能做 JOIN,為了達到這個目的,MERGE JOIN 左側是一個 MERGE SORT RECEIVE IN(IN.SORT)算子,它是阻塞算子,需要把下面的所有數據都收上來之後,才會向 MERGE JOIN 吐出第一行。

一文詳解 OceanBase 並行執行引擎實現

另外,還需要注意到一個情況,依然以上圖為例,考慮場景:MERGE JOIN 匹配了一行結果,會往上吐,圖中吐出的結果會被 HASH GROUPBY(HASH GBY)全部緩存下來,直到 MERGE JOIN 匹配完所有數據後, HASH GROUPBY 才會對上層吐數據。這時讀 orders 表、lineitem 表的 DFO 都已經調度結束,最上層的 DFO 被調度,正好能消費 HASH GROUPBY 對上層吐出的數據。

假設 MERGE JOIN 上面沒有 HASH GROUP BY 算子怎麼辦?這時數據會嘗試向上吐,但上面的 DFO 還沒有被調度,MERGE JOIN 吐出的數據發不出去會導致 MERGE JOIN 被阻塞,整個 Query 都無法推進。為了解決這個問題,我們會在 MERGE JOIN 上面插入一個 Material 算子,用它來緩存 MERGE JOIN 吐出的所有數據。

但也有特殊的情況,存在 2 組線程無論如何都搞不定的場景,這時候我們也支持分配 3 組線程。具體參見 特色功能-調度 章節的討論。

網絡通信方法

一對有關聯的 DFO,child DFO 作為生產者分配了 M 個 Worker 線程, parent DFO 作為消費者分配了 N 個 Worker 線程。他們之間的數據傳輸需要用到 M * N 個網絡通道。

一文詳解 OceanBase 並行執行引擎實現

為了對這種網絡通信進行抽象,引入數據傳輸層(Data Transfer Layer, DTL)的概念,任意兩點之間的通信連接用通道(channel)的概念來描述。

通道分為發送端和接收端,在最初的實現中我們允許發送端無限地給接收端發送數據,但發現如果接收端無法立即消費掉這些數據,可能會導致接收端內存爆,所以加入了流控邏輯。每個 channel 接收端預留了三個槽位,當槽位被數據佔滿時會通知發送端暫停發送數據,當有接收端數據被消費空閒槽位出現時通知發送端繼續發送。

資源控制與 Query 排隊

PX 是以線程為基本單位分配運行資源,有一個固定大小的共享線程池供每個租戶的 PX 請求。當併發請求較多,線程資源不夠時,會讓請求線程失敗的 Query 排隊。

分佈式場景下,如果一個 Query 已經獲取了一部分線程,另一部分線程獲取失敗,會重試獲取線程,如果重試若干秒後依然無法獲取到線程資源,說明當前系統繁忙,會讓當前 Query 失敗。

特色功能

1. 調度

OceanBase 2.0 支持多種形態的計劃調度,例如常見的左深樹、右深樹,以及之字型樹(Zig-Zag Tree)

一文詳解 OceanBase 並行執行引擎實現

樹的形態決定了調度需要的線程資源組數。左深樹的調度是最簡單的,只需要同時啟動兩組線程就可以驅動左深樹的執行,對於右深樹,則需要 3 組線程才能驅動。如上圖,用數子標註了調度順序,數子相同時表示對應邊會同時調度,上面的右深樹中的數字 3、4 分別對應了兩條邊,兩條邊連接了 3 個 DFO,這 3 個 DFO 會同時調度。

下面略微變形的左深樹、之字型樹雖然看起來複雜不少,它們本質和上面的的 3 種樹型是一樣的,調度上依然可以輕鬆支持。

一文詳解 OceanBase 並行執行引擎實現

除了上面介紹的樹型之外,都稱之為 Bushy Tree,目前 Bushy Tree 的調度不支持流水執行。當遇到 Bushy Tree 時,回退為單步執行模式,每次只調度執行一個 DFO,寫中間結果。

一文詳解 OceanBase 並行執行引擎實現

2. 流水與落盤

上面提到單步調度、雙 DFO 調度、 3-DFO 調度,那麼不同的調度方式,對數據的流水有什麼影響呢?

首先看單步調度,每一步計算都寫中間結果,數據無法流水。

再看雙 DFO 調度,由於同時調度 2 層 DFO,下層 DFO 作為生產者,上層 DFO 作為消費者,可以形成一個局部流水線,下層生產的數據無需落盤,可以直接通過網絡推送給上層消費。

但是,雙 DFO 調度推進若干步後,還是會面臨落盤的場景。例如下面的左深樹中,紅色節點表示當前正在被調度的 DFO,灰色表示已經調度完成的 DFO,白色表示尚未調度的 DFO。左圖中,HASH JOIN 1 的 hash 表已經通過 t1 送來的數據建好,從右邊 t2 表每讀入一行,就可以立即向上面輸出匹配的數據。但是,HASH JOIN 2 還沒有開始調度執行,無法接受 HASH JOIN 1 的數據。作為應對,需要將 HASH JOIN 1 的結果暫存在內存中,甚至落盤。當 HASH JOIN 2 被調度後,才開始消費暫存的數據,如右圖所示。

一文詳解 OceanBase 並行執行引擎實現

我們還有一種策略避免落盤,即 3-DFO 調度。還是用上面的例子來說明,SCAN t2、HASH JOIN 1、HASH JOIN 2 同時調度,就可以避免在 HASH JOIN 1 中強制落盤。這麼做的代價是:需要啟動更多組線程。

一文詳解 OceanBase 並行執行引擎實現

是不是還可以有 N-DFO 調度策略呢?理論上是可以的,但通常這麼做沒什麼意義。處在高層的 DFO 在調度執行後,會很長一段時間無事可做,直到首行輸入到來。

綜上所述,DFO 的並行調度層數和流水的長度緊密相關,到底並行調度多少層 DFO 最合適,需要根據系統的資源富裕程度來合理決策。考慮到 SQL 中很多算子(如 Sort、Group By 等)天然具備暫存結果的能力,兩層 DFO 就可以滿足 OceanBase 大部分場景的需求。

3. 可變並行度

並行度用於控制用多少線程來執行一個 DFO,一種簡單策略是每個 DFO 的並行度都一樣,但在部分場景下這不是最優選擇,考慮下面的場景:t1, t2 做連接,其中 t1 是大表, t2 是小表,但 t1 經過複雜條件過濾後輸出的行數很少,廣播到 t2 表上做連接。那麼可以考慮給 t1 表掃描分配較大並行度,t2 掃描以及 join 分配較小的並行度。如下圖:


一文詳解 OceanBase 並行執行引擎實現

雖然可變並行度的設計是很符合直覺的,但並不是所有的數據庫都支持這麼做。允許可變並行度,可以讓執行優化具備更多的靈活性,在不損失效率的基礎上更加節省資源。

未來工作

目前,OceanBase 並行框架還遠遠沒有達到成熟,未來我們會在以下幾個方面繼續夯實基礎:

  • 資源管控,包括更好的 CPU 資源管控策略,memory 管控策略等
  • 性能分析與診斷框架
  • 容錯策略
  • 算子、表達式性能優化,數據迭代模式的優化
  • 網絡框架優化
  • 存儲優化

We are Hiring!

OceanBase 九年如一日,不忘初心,砥礪前行,致力於實現一箇中國人完全自主設計的分佈式通用數據庫系統,打破西方大廠在商業數據庫領域的絕對壟斷地位。時至今日,OceanBase已經成功應用於螞蟻金服的交易、支付、賬務等核心系統和網商銀行、印度Paytm等業務系統。

非常歡迎有志於讓中國的政府和企業用上中國人自己的通用商業數據庫的同學加入我們,一起為實現這一目標而共同努力!發送簡歷到[email protected],我們等的就是你!


分享到:


相關文章: