轉載:https://www.javazhiyin.com/15910.html
1. 概述
相信很多同學看過 MySQL 各種優化的文章,裡面 99% 會提到:單表數據量大了,需要進行分片(水平拆分 or 垂直拆分)。分片之後,業務上必然面臨的場景:跨分片的數據合併。今天我們就一起來瞅瞅 MyCAT 是如何實現分片結果合併。
跨分片查詢大體流程如下:
flow
和 《【單庫單表】查詢》 不同的兩個過程:
- 【2】多分片執行 SQL
- 【4】合併多分片結果
下面,我們來逐條講解這兩個過程。
2. 多分片執行 SQL
execute_sql
經過 SQL 解析後,計算出需要執行 SQL 的分片節點,遍歷分片節點發送 SQL 進行執行。
核心代碼:
- MultiNodeQueryHandler.java#execute(...)
SQL 解析 詳細過程,我們另開文章,避免內容過多,影響大家對 分片結果合併 流程和邏輯的理解。
3. 合併多分片結果
handle_response
和 《【單庫單表】查詢》 不同,多個分片節點都會分別響應 記錄頭(header) 和 記錄行(row) 。在開始分析 MyCAT 是怎麼合併多分片結果之前,我們先來回想下 SQL 的執行順序。
FROM
// [1] 選擇表 WHERE
// [2] 過濾表 GROUP BY
// [3] 分組SELECT
// [4] 普通字段,max / min / avg / sum / count 等函數,distinctHAVING
// [5] 再過濾表ORDER BY
// [6] 排序LIMIT
// [7] 分頁
3.1 記錄頭(header)
多個分片節點響應時,會響應多次 記錄頭(header) 。MyCAT 在實際處理時,只處理第一個返回的 記錄頭(header) 。因此,在使用時要保證表的 Schema 相同。
分片節點 響應的 記錄頭(header) 可以直接返回 MySQL Client 嗎?答案是不可以。AVG函數 是特殊情況,MyCAT 需要將 AVG 拆成 SUM + COUNT 進行計算。舉個例子:
// [1] MySQL Client => MyCAT : SELECT AVG(age) FROM student;
// [2] MyCAT => MySQL Server : SELECT SUM(age) AS AVG0SUM, COUNT(age) AS AVG0COUNT FROM student;
// [3] 最終:AVG(age) = SUM(age) AS AVG0SUM / COUNT(age)
核心代碼:
- MultiNodeQueryHandler.java#fieldEofResponse(...)。
3.2 記錄行(row)
3.1 AbstractDataNodeMerge
MyCAT 對分片結果合併通過 AbstractDataNodeMerge 子類來完成。
merge_service
AbstractDataNodeMerge :
- -packs :待合併記錄行(row)隊列。隊列尾部插入 END_FLAG_PACK 表示隊列已結束。
- -running :合併邏輯是否正在執行中的標記。
- ~onRowMetaData(...) :根據記錄列信息(ColMeta)構建對應的排序組件和聚合組件。需要子類進行實現。
- ~onNewRecord(...) :插入記錄行(row) 到 packs。
- ~outputMergeResult(...) :插入 END_FLAG_PACK 到 packs。
- ~run(...) :執行合併分片結果邏輯,並將合併結果返回給 MySQL Client。需要子類進行實現。
AbstractDataNodeMerge_run.png
通過 running 標記保證同一條 SQL 同時只有一個線程正在執行,並且不需要等到每個分片結果都返回就可以執行聚合邏輯。當然,排序邏輯需要等到所有分片結果都返回才可以執行。
核心代碼:
- AbstractDataNodeMerge.java
- DataNodeMergeManager.java#run(...)
3.2 DataNodeMergeManager
AbstractDataNodeMerge 有兩種子類實現:
- DataMergeService :基於堆內內存合併分片結果。
- DataNodeMergeManager :基於堆外內存合併分片結果。
目前官方默認配置使用 DataNodeMergeManager。主要有如下優點:
- 可以使用更大的內存空間。當併發量大或者數據量大時,更大的內存空間意味著更好的性能。
- 減少 GC 暫停時間。記錄行(row)對象小且重用性很低,需要能夠進行類似 C / C++ 的自主內存釋放。
- 更快的內存複製和讀取速度,對排序和聚合帶來很好的提速。
如果對堆外內存不太瞭解,推薦閱讀如下文章:
- 《從0到1起步-跟我進入堆外內存的奇妙世界》
- 《堆內內存還是堆外內存?》
- 《JAVA堆外內存》
- 《JVM源碼分析之堆外內存完全解讀》
本文主要分析 DataNodeMergeManager 實現,DataMergeService 可以自己閱讀或者等待後續文章(歡迎訂閱我的公眾號噢)。
DataNodeMergeManager 有三個組件:
- globalSorter :UnsafeExternalRowSorter => 實現記錄行(row)合併並排序邏輯。
- globalMergeResult :UnsafeExternalRowSorter => 實現記錄行(row)合併不排序邏輯。
- unsafeRowGrouper : UnsafeRowGrouper => 實現記錄行(row)聚合邏輯。
DataNodeMergeManager#run(...) 邏輯如下:
- [1] 寫入記錄行(row)到 UnsafeRow。
- [2] 根據情況將 UnsafeRow 插入對應組件。
- [3] 當所有 UnsafeRow 插入完後,根據情況使用組件聚合、排序。
核心代碼:
- DataNodeMergeManager.java。
看到這裡,可能很多同學都有點懵逼,問題不大,我們繼續往下瞅。
3.3 UnsafeRow
unsafe_row
記錄行(row)寫到 UnsafeRow 的 baseObject 屬性,結構如下:
unsafe_row_object
unsafe_row_2.png
- 拆分成三個區域,每個區域按照格子記錄信息,每個格子 64bits(8 Bytes)。
- 記錄行(row)按照字段順序位置記錄到 baseObject。
- [1] 空標記位區域 :標記字段對應的值是否為 NULL。
- 當字段對應的值為 NULL 時,其對應的字段順序對應的 bit 設置為 1。舉個例子,第 0 個位置字段為 NULL,則第一個格子對應的 64 bits 從右邊第一個 bit 設置為 1。
- 因為每個格子是 64 bits,每 64 個字段佔用一個格子,不滿一個格子,按照一個格子計算。因此,該區域的長度(bitSetWidthInBytes) = 字段佔用的格子數 * 64 bits。
- [2] 位置長度區域 :記錄字段對應的值在[3]區域所在的位置和長度。
- 每個字段記錄[2]區域的位置 = baseOffset + bitSetWidthInBytes + 8 Bytes * 字段順序。
- 佔用一個格子,前 32 bits 為[3]區域的位置,後 32 bits 為字段對應的值長度。
- [3] 值區域 :記錄字段對應的值。
- 每個字段對應的值佔用格子數 = 字段對應的值長度 / 8 Byte,如果無法整除再 + 1。
- 因為字段對應的值可能無法剛好佔滿每個格子,未使用的 bit 用 0 佔位。
寫入 UnsafeRow,MyCAT 可以順序訪問每個字段,而不需要在記錄行(row)進行遍歷。
日常開發使用位操作的機會比較少,可能較為難理解,需要反覆理解下,相信會獲得很大啟發。恩,該部分代碼引用自開源運算框架 Spark,是不是更加有動力列。
核心代碼:
- UnsafeRow.java
- BufferHolder.java
- UnsafeRowWriter.java
3.4 UnsafeExternalRowSorter
如果使用 Java 實現 SELECT * FROM student ORDER BY age desc, nickname asc,不考慮算法優化的情況下,我們可以簡單如下實現:
Collections.sort(students,new Comparator<comparable>(){
@Override
public int compare(Student o1,Student o2){
int cmp=compare(o2.age,o1.age);
return cmp!=0?cmp:compare(o1.nickname,o2.nickname);
}
}}
);
/<comparable>
從功能上,UnsafeExternalRowSorter 是這麼實現排序邏輯。當然肯定的是,不是這麼“簡單”的實現。
sorter_write
UnsafeRow 會寫入到兩個地方:
- List<memoryblock> :內存塊數組。當前 MemoryBlock 無法容納寫入的 UnsafeRow時,生成新的 MemoryBlock 提供寫入。每條 UnsafeRow 存儲在 MemoryBlock 由 長度 + 字節內容 組成。/<memoryblock>
- LongArray :每條 UnsafeRow 存儲在 LongArray 由兩部分組成:address + prefix。
- address :UnsafeRow 存儲在 List<memoryblock> 的位置。前 13 bits 記錄所在 MemoryBlock 的 index,後 51 bit 記錄在 MemoryBlock 的 offset。/<memoryblock>
- prefix :UnsafeRow 第一個排序字段值前 64 bits 計算的值。
UnsafeExternalRowSorter 排序實現方式 :提供 TimSort 和 RadixSort 兩種排序算法,前者為默認實現。TimSort 折半查找時,使用 LongArray,先比較 prefix,若相等,則順序對比每個排序字段直到不等,提升計算效率。插入操作在 LongArray 操作,List<memoryblock> 只作為原始數據。/<memoryblock>
另外,當需要排序特別大的數據量時,會使用存儲數據到文件進行排序。限於筆者暫時未閱讀該處源碼,後續會另開文章分析。
核心源碼:
- UnsafeExternalRowSorter.java
- UnsafeExternalRowSorter.java
- TimSort.java
3.5 UnsafeRowGrouper
如果使用 Java 實現 SELECT nickname, COUNT(*) FROM student group by nickname,不考慮算法優化的情況下,我們可以簡單如下實現:
Map<string>>map=new HashMap<>();// 聚合
for (student : students) {
if (map.contains(student.nickname)) {
map.put(student.nickname, map.get(student.nickname).get(1) + 1);
} else {
List<object> value = new Array<>();
value.add(nickname);
value.add(1);
map.put(student.nickname, value);
}
}// 輸出
for (value : map.values) {
System.out.println(value);
}
/<object>/<string>
從功能上,UnsafeRowGrouper 是這麼實現排序邏輯。當然肯定的是,也不是這麼“簡單”的實現。
具體怎麼實現的呢?我們在《MyCAT 源碼解析 —— 分片結果合併(二)》繼續分析。
4. 救護中心
看到此處的應該是真愛吧?!如果內容上有什麼錯誤或者難懂的地方,可以關注我的微信公眾號給我留言,我會很認真的逐條解答的。“萬一”覺得本文還可以,希望轉發到朋友圈讓更多的人看到。
閱讀更多 java互聯網架構 的文章