數據庫中間件 MyCAT 源碼解析——分片結果合併(一)

轉載:https://www.javazhiyin.com/15910.html

1. 概述

相信很多同學看過 MySQL 各種優化的文章,裡面 99% 會提到:單表數據量大了,需要進行分片(水平拆分 or 垂直拆分)。分片之後,業務上必然面臨的場景:跨分片的數據合併。今天我們就一起來瞅瞅 MyCAT 是如何實現分片結果合併

跨分片查詢大體流程如下:

數據庫中間件 MyCAT 源碼解析——分片結果合併(一)

flow

和 《【單庫單表】查詢》 不同的兩個過程:

  • 【2】多分片執行 SQL
  • 【4】合併多分片結果

下面,我們來逐條講解這兩個過程。

2. 多分片執行 SQL

數據庫中間件 MyCAT 源碼解析——分片結果合併(一)

execute_sql

經過 SQL 解析後,計算出需要執行 SQL 的分片節點,遍歷分片節點發送 SQL 進行執行。

核心代碼

  • MultiNodeQueryHandler.java#execute(...)

SQL 解析 詳細過程,我們另開文章,避免內容過多,影響大家對 分片結果合併 流程和邏輯的理解。

3. 合併多分片結果

數據庫中間件 MyCAT 源碼解析——分片結果合併(一)

handle_response

和 《【單庫單表】查詢》 不同,多個分片節點都會分別響應 記錄頭(header)記錄行(row) 。在開始分析 MyCAT 是怎麼合併多分片結果之前,我們先來回想下 SQL 的執行順序。

FROM 
// [1] 選擇表 WHERE
// [2] 過濾表 GROUP BY
// [3] 分組SELECT
// [4] 普通字段,max / min / avg / sum / count 等函數,distinctHAVING
// [5] 再過濾表ORDER BY
// [6] 排序LIMIT
// [7] 分頁

3.1 記錄頭(header)

多個分片節點響應時,會響應多次 記錄頭(header) 。MyCAT 在實際處理時,只處理第一個返回的 記錄頭(header) 。因此,在使用時要保證表的 Schema 相同。

分片節點

響應的 記錄頭(header) 可以直接返回 MySQL Client 嗎?答案是不可以。AVG函數 是特殊情況,MyCAT 需要將 AVG 拆成 SUM + COUNT 進行計算。舉個例子:

// [1] MySQL Client => MyCAT : SELECT AVG(age) FROM student; 
// [2] MyCAT => MySQL Server : SELECT SUM(age) AS AVG0SUM, COUNT(age) AS AVG0COUNT FROM student;
// [3] 最終:AVG(age) = SUM(age) AS AVG0SUM / COUNT(age)

核心代碼

  • MultiNodeQueryHandler.java#fieldEofResponse(...)。

3.2 記錄行(row)

3.1 AbstractDataNodeMerge

MyCAT 對分片結果合併通過 AbstractDataNodeMerge 子類來完成。

數據庫中間件 MyCAT 源碼解析——分片結果合併(一)

merge_service

AbstractDataNodeMerge :

  • -packs :待合併記錄行(row)隊列。隊列尾部插入 END_FLAG_PACK 表示隊列已結束。
  • -running :合併邏輯是否正在執行中的標記。
  • ~onRowMetaData(...) :根據記錄列信息(ColMeta)構建對應的排序組件和聚合組件。需要子類進行實現。
  • ~onNewRecord(...) :插入記錄行(row) 到 packs。
  • ~outputMergeResult(...) :插入 END_FLAG_PACK 到 packs。
  • ~run(...) :執行合併分片結果邏輯,並將合併結果返回給 MySQL Client。需要子類進行實現。
數據庫中間件 MyCAT 源碼解析——分片結果合併(一)

AbstractDataNodeMerge_run.png

通過 running 標記保證同一條 SQL 同時只有一個線程正在執行,並且不需要等到每個分片結果都返回就可以執行聚合邏輯。當然,排序邏輯需要等到所有分片結果都返回才可以執行。

核心代碼

  • AbstractDataNodeMerge.java
  • DataNodeMergeManager.java#run(...)

3.2 DataNodeMergeManager

AbstractDataNodeMerge 有兩種子類實現:

  • DataMergeService :基於堆內內存合併分片結果。
  • DataNodeMergeManager :基於堆外內存合併分片結果。

目前官方默認配置使用 DataNodeMergeManager。主要有如下優點:

  1. 可以使用更大的內存空間。當併發量大或者數據量大時,更大的內存空間意味著更好的性能。
  2. 減少 GC 暫停時間。記錄行(row)對象小且重用性很低,需要能夠進行類似 C / C++ 的自主內存釋放。
  3. 更快的內存複製和讀取速度,對排序和聚合帶來很好的提速。

如果對堆外內存不太瞭解,推薦閱讀如下文章:

  1. 《從0到1起步-跟我進入堆外內存的奇妙世界》
  2. 《堆內內存還是堆外內存?》
  3. 《JAVA堆外內存》
  4. 《JVM源碼分析之堆外內存完全解讀》

本文主要分析 DataNodeMergeManager 實現,DataMergeService 可以自己閱讀或者等待後續文章(歡迎訂閱我的公眾號噢)。

DataNodeMergeManager 有三個組件:

  • globalSorter :UnsafeExternalRowSorter => 實現記錄行(row)合併並排序邏輯。
  • globalMergeResult :UnsafeExternalRowSorter => 實現記錄行(row)合併不排序邏輯。
  • unsafeRowGrouper : UnsafeRowGrouper => 實現記錄行(row)聚合邏輯。

DataNodeMergeManager#run(...) 邏輯如下:

  • [1] 寫入記錄行(row)到 UnsafeRow。
  • [2] 根據情況將 UnsafeRow 插入對應組件。
  • [3] 當所有 UnsafeRow 插入完後,根據情況使用組件聚合、排序。
數據庫中間件 MyCAT 源碼解析——分片結果合併(一)

核心代碼

  • DataNodeMergeManager.java。

看到這裡,可能很多同學都有點懵逼,問題不大,我們繼續往下瞅。

3.3 UnsafeRow

數據庫中間件 MyCAT 源碼解析——分片結果合併(一)

unsafe_row

記錄行(row)寫到 UnsafeRow 的 baseObject 屬性,結構如下:

數據庫中間件 MyCAT 源碼解析——分片結果合併(一)

unsafe_row_object

數據庫中間件 MyCAT 源碼解析——分片結果合併(一)

unsafe_row_2.png

  • 拆分成三個區域,每個區域按照格子記錄信息,每個格子 64bits(8 Bytes)
  • 記錄行(row)按照字段順序位置記錄到 baseObject。
  • [1] 空標記位區域 :標記字段對應的值是否為 NULL。
  • 當字段對應的值為 NULL 時,其對應的字段順序對應的 bit 設置為 1。舉個例子,第 0 個位置字段為 NULL,則第一個格子對應的 64 bits 從右邊第一個 bit 設置為 1。
  • 因為每個格子是 64 bits,每 64 個字段佔用一個格子,不滿一個格子,按照一個格子計算。因此,該區域的長度(bitSetWidthInBytes) = 字段佔用的格子數 * 64 bits。
  • [2] 位置長度區域 :記錄字段對應的值在[3]區域所在的位置和長度。
  • 每個字段記錄[2]區域的位置 = baseOffset + bitSetWidthInBytes + 8 Bytes * 字段順序。
  • 佔用一個格子,前 32 bits 為[3]區域的位置,後 32 bits 為字段對應的值長度。
  • [3] 值區域 :記錄字段對應的值。
  • 每個字段對應的值佔用格子數 = 字段對應的值長度 / 8 Byte,如果無法整除再 + 1。
  • 因為字段對應的值可能無法剛好佔滿每個格子,未使用的 bit 用 0 佔位。

寫入 UnsafeRow,MyCAT 可以順序訪問每個字段,而不需要在記錄行(row)進行遍歷。

日常開發使用位操作的機會比較少,可能較為難理解,需要反覆理解下,相信會獲得很大啟發。恩,該部分代碼引用自開源運算框架 Spark,是不是更加有動力列。

核心代碼

  • UnsafeRow.java
  • BufferHolder.java
  • UnsafeRowWriter.java

3.4 UnsafeExternalRowSorter

如果使用 Java 實現 SELECT * FROM student ORDER BY age desc, nickname asc,不考慮算法優化的情況下,我們可以簡單如下實現:

Collections.sort(students,new Comparator<comparable>(){
@Override
public int compare(Student o1,Student o2){
int cmp=compare(o2.age,o1.age);

return cmp!=0?cmp:compare(o1.nickname,o2.nickname);
}
}}
);
/<comparable>

從功能上,UnsafeExternalRowSorter 是這麼實現排序邏輯。當然肯定的是,不是這麼“簡單”的實現。

數據庫中間件 MyCAT 源碼解析——分片結果合併(一)

sorter_write

UnsafeRow 會寫入到兩個地方:

  1. List<memoryblock> :內存塊數組。當前 MemoryBlock 無法容納寫入的 UnsafeRow時,生成新的 MemoryBlock 提供寫入。每條 UnsafeRow 存儲在 MemoryBlock 由 長度 + 字節內容 組成。/<memoryblock>
  2. LongArray :每條 UnsafeRow 存儲在 LongArray 由兩部分組成:address + prefix。
  • address :UnsafeRow 存儲在 List<memoryblock> 的位置。前 13 bits 記錄所在 MemoryBlock 的 index,後 51 bit 記錄在 MemoryBlock 的 offset。/<memoryblock>
  • prefix :UnsafeRow 第一個排序字段前 64 bits 計算的值。

UnsafeExternalRowSorter 排序實現方式 :提供 TimSortRadixSort 兩種排序算法,前者為默認實現。TimSort 折半查找時,使用 LongArray,先比較 prefix,若相等,則順序對比每個排序字段直到不等,提升計算效率。插入操作在 LongArray 操作,List<memoryblock> 只作為原始數據。/<memoryblock>

另外,當需要排序特別大的數據量時,會使用存儲數據到文件進行排序。限於筆者暫時未閱讀該處源碼,後續會另開文章分析。

核心源碼:

  • UnsafeExternalRowSorter.java
  • UnsafeExternalRowSorter.java
  • TimSort.java

3.5 UnsafeRowGrouper

如果使用 Java 實現 SELECT nickname, COUNT(*) FROM student group by nickname,不考慮算法優化的情況下,我們可以簡單如下實現:

Map<string>>map=new HashMap<>();// 聚合
for (student : students) {
if (map.contains(student.nickname)) {
map.put(student.nickname, map.get(student.nickname).get(1) + 1);
} else {
List<object> value = new Array<>();
value.add(nickname);
value.add(1);
map.put(student.nickname, value);
}
}// 輸出
for (value : map.values) {
System.out.println(value);
}
/<object>/<string>

從功能上,UnsafeRowGrouper 是這麼實現排序邏輯。當然肯定的是,也不是這麼“簡單”的實現。

具體怎麼實現的呢?我們在《MyCAT 源碼解析 —— 分片結果合併(二)》繼續分析。

4. 救護中心

看到此處的應該是真愛吧?!如果內容上有什麼錯誤或者難懂的地方,可以關注我的微信公眾號給我留言,我會很認真的逐條解答的。“萬一”覺得本文還可以,希望轉發到朋友圈讓更多的人看到。


分享到:


相關文章: