02.26 TiDB Binlog 源碼閱讀系列文章 (九)同步數據到下游


完結篇 | TiDB Binlog 源碼閱讀系列文章 (九)同步數據到下游


上篇文章介紹了用於將 binlog 同步到 MySQL / TiDB 的 Loader package,本文往回退一步,介紹 Drainer 同步到不同下游的機制。

TiDB Binlog(github.com/pingcap/tidb-binlog)用於收集 TiDB 的 binlog,並準實時同步給下游。 同步數據這一步重要操作由 Drainer 模塊支持,它可以將 binlog 同步到 TiDB / MySQL / Kafka / File (增量備份)等下游組件。

  • 對於 TiDB 和 MySQL 兩種類型的下游組件,Drainer 會從 binlog 中還原出對應的 SQL 操作在下游直接執行;
  • 對於 Kafka 和 File(增量備份)兩種類型的下游組件,輸出約定編碼格式的 binlog。用戶可以定製後續各種處理流程,如更新搜索引擎索引、清除緩存、增量備份等。TiDB Binlog 自帶工具 Reparo 實現了將增量備份數據(下游類型為 File(增量備份))同步到 TiDB / MySQL 的功能。

本文將按以下幾個小節介紹 Drainer 如何將收到的 binlog 同步到下游:

  1. Drainer Sync 模塊:Drainer 通過 Sync 模塊調度整個同步過程,所有的下游相關的同步邏輯統一封裝成了 Syncer 接口。
  2. 恢復工具 Reparo (讀音:reh-PAH-roh):從下游保存的 File(增量備份)中讀取 binlog 同步到 TiDB / MySQL。

Drainer Sync 模塊

Syncer

同步機制的核心是 Syncer 接口,定義如下:

<code>// Syncer sync binlog item to downstreamtype Syncer interface {  // Sync the binlog item to downstream  Sync(item *Item) error  // will be close if Close normally or meet error, call Error() to check it  Successes() /<code>

其中 Sync 方法表示異步地向下遊同步一個 binlog,對應的參數類型是 *Item,這是一個封裝了 binlog 的結構體;Successes 方法返回一個 channel,從中可以讀取已經成功同步到下游的 Item;Error 方法返回一個 channel,當 Syncer 同步過程出錯中斷時,會往這個 channel 發送遇到的錯誤;Close 用於關掉 Syncer,釋放資源。

支持的每個下游類型在 drainer/sync 目錄下都有一個對應的 Syncer 實現,例如 MySQL 對應的是 mysql.go 裡的 MySQLSyncer,Kafka 對應的是 kafka.go 裡的 KafkaSyncer。Drainer 啟動時,會根據配置文件中指定的下游,找到對應的 Syncer 實現,然後就可以用統一的接口管理整個同步過程了。

Checkpoint

同步進程可能因為各種原因退出,重啟後要恢復同步就需要知道上次同步的進度。在 Drainer 裡記錄同步進度的功能抽象成 Checkpoint 接口,其定義如下:

<code>type CheckPoint interface {  // Load loads checkpoint information.  Load() error  // Save saves checkpoint information.  Save(int64) error  // Pos gets position information.  TS() int64  // Close closes the CheckPoint and release resources, after closed other methods should not be called again.  Close() error}/<code>

從以上定義中可以看到,Save 的參數和 TS 的返回結果都是 int64 類型,因為同步的進度是以 TiDB 中單調遞增的 commit timestamp 來記錄的,它的類型就是 int64。

Drainer 支持不同類型的 Checkpoint 實現,例如 mysql.go 裡的 MySQLCheckpoint,默認將 commit timestamp 寫到 tidb_binlog 庫下的 checkpoint 表。Drainer 會根據下游類型自動選擇不同的 Checkpoint 實現,例如 TiDB / MySQL 的下游就會使用 MySQLCheckPoint,File(增量備份) 則使用 PbCheckpoint。

在 Syncer 小節,我們看到 Syncer 的 Successes 方法提供了一個 channel 用來接收已經處理完畢的 binlog,收到 binlog 後,我們用 Checkpoint 的 Save 方法保存 binlog 的 commit timestamp 就可以記下同步進度,細節可查看源碼中的 handleSuccess 方法。

Translator

Syncer 在收到 binlog 後需要將裡面記錄的變更轉換成適合下游 Syncer 類型的格式,這部分實現在 drainer/translator 包。

以下游是 MySQL / TiDB 的情況為例。MySQLSyncer.Sync 會先調用 TiBinlogToTxn

將 binlog 轉換成 loader.Txn 以便接入下層的 loader 模塊 (loader 接收一個個 loader.Txn 結構並還原成對應的 SQL 批量寫入 MySQL / TiDB)。

loader.Txn 定義如下:

<code>// Txn holds transaction info, an DDL or DML sequencestype Txn struct {  DMLs []*DML  DDL  *DDL  // This field is used to hold arbitrary data you wish to include so it  // will be available when receiving on the Successes channel  Metadata interface{}}/<code>

Txn 主要有兩類:DDL 和 DML。Metadata 目前放的就是傳給 Sync 的 *Item 對象。DDL 的情況比較簡單,因為 binlog 中已經直接包含了我們要用到的 DDL Query。DML 則需要遍歷 binlog 中的一個個行變更,根據它的類型 insert / update / delete 還原成相應的 loader.DML。

Schema

上個小節中,我們提到了對行變更數據的解析,在 binlog 中編碼的行變更是沒有列信息的,我們需要查到對應版本的列信息才能還原出 SQL 語義。Schema 就是解決這個問題的模塊。

在 Drainer 啟動時,會調用 loadHistoryDDLJobs 從 TiKV 處查詢截至當前時間所有已完成的 DDL Job 記錄,按 SchemaVersion 升序排序(可以粗略認為這是一個單調遞增地賦給每個 DDL 任務的版本號)。這些記錄在 Syncer 中會用於創建一個 Schema 對象。在運行過程中,Drainer 每遇到一條 DDL 也會添加到 Schema 中。

binlog 中帶有一個 SchemaVersion 信息,記錄這條 binlog 生成的時刻 Schema 版本。在同步 Binlog 前,我們會先用這個 SchemaVersion 信息調用 Schema 的一個方法 handlePreviousDDLJobIfNeed。上一段中我們看到 Schema 從何處收集到有序的 DDL Job 記錄,這個方法則是按順序應用 SchemaVersion 小於等於指定版本的 DDL Job,在 Schema 中維護每個表對應版本的最新結構信息,去掉一些錯誤代碼後實現大致如下:

<code>func (s *Schema) handlePreviousDDLJobIfNeed(version int64) error {  var i int  for i = 0; i < len(s.jobs); i++ {     if s.jobs[i].BinlogInfo.SchemaVersion <= version {        _, _, _, err := s.handleDDL(s.jobs[i])        if err != nil {           return errors.Annotatef(err, "handle ddl job %v failed, the schema info: %s", s.jobs[i], s)        }     } else {        break     }  }  s.jobs = s.jobs[i:]  return nil}/<code>

對於每個符合條件的 Job,由 handleDDL 方法將其表結構 TableInfo 等信息更新到 Schema 中,其他模塊就可以查詢到表格當前最新的信息。

恢復工具

我們知道 Drainer 除了可以將 binlog 直接還原到下游數據庫以外,還支持同步到其他外部存儲系統塊,所以我們也提供了相應的工具來處理存儲下來的文件,Reparo 是其中之一,用於讀取存儲在文件系統中的 binlog 文件,寫入 TiDB 中。本節簡單介紹下 Reparo 的用途與實現,讀者可以作為示例瞭解如何處理同步到文件系統的 binlog 增量備份。

Reparo

Reparo 可以讀取同步到文件系統上的 binlog 增量備份並同步到 TiDB。

讀取 binlog

當下遊設置成 File(增量備份) 時,Drainer 會將 Protobuf 編碼的 binlog 保存到指定目錄,每寫滿 512 MB 新建一個文件。每個文件有個編號,從 0 開始依次類推。文件名格式定義如下:

<code>// BinlogName creates a binlog file name. The file name format is like binlog-0000000000000001-20181010101010func BinlogName(index uint64) string {  currentTime := time.Now()  return binlogNameWithDateTime(index, currentTime)}// binlogNameWithDateTime creates a binlog file name.func binlogNameWithDateTime(index uint64, datetime time.Time) string {  return fmt.Sprintf("binlog-%016d-%s", index, datetime.Format(datetimeFormat))}/<code>

文件的前綴都是 “binlog-”,後面跟一個 16 位右對齊的編號和一個時間戳。將目錄裡的文件按字母順序排序就可以得到按編號排序的 binlog 文件名。從指定目錄獲取文件列表的實現如下:

<code>// ReadDir reads and returns all file and dir names from directoryfunc ReadDir(dirpath string) ([]string, error) {  dir, err := os.Open(dirpath)  if err != nil {     return nil, errors.Trace(err)  }  defer dir.Close()  names, err := dir.Readdirnames(-1)  if err != nil {     return nil, errors.Annotatef(err, "dir %s", dirpath)  }  sort.Strings(names)  return names, nil}/<code>

這個函數簡單地獲取目錄裡全部文件名,排序後返回。在上層還做了一些過濾來去掉臨時文件等。得到文件列表後,Reparo 會用標準庫的 bufio.NewReader 逐個打開文件,然後用 Decode 函數讀出其中的一條條 binlog:

<code>func Decode(r io.Reader) (*pb.Binlog, int64, error) {  payload, length, err := binlogfile.Decode(r)  if err != nil {     return nil, 0, errors.Trace(err)  }  binlog := &pb.Binlog{}  err = binlog.Unmarshal(payload)  if err != nil {     return nil, 0, errors.Trace(err)  }  return binlog, length, nil}/<code>

這裡先調用了 binlogfile.Decode 從文件中解析出對應 Protobuf 編碼的一段二進制數據然後解碼出 binlog。

寫入 TiDB

得到 binlog 後就可以準備寫入 TiDB。Reparo 這部分實現像一個簡化版的 Drainer 的 Sync 模塊,同樣有一個 Syncer 接口以及幾個具體實現(除了 mysqlSyncer 還有用於調試的 printSyncer 和 memSyncer),所以就不再介紹。值得一提的是,這裡也跟前面很多 MySQL / TiDB 同步相關的模塊一樣使用了 loader 模塊。

小結

本文介紹了 Drainer 是如何實現數據同步的以及 Reparo 如何從文件系統中恢復增量備份數據到 MySQL / TiDB。在 Drainer 中,Syncer 封裝了同步到各個下游模塊的具體細節,Checkpoint 記錄同步進度,Translator 從 binlog 中還原出具體的變更,Schema 在內存中維護每個表對應的表結構定義。

TiDB Binlog 源碼閱讀系列在此就全部完結了,相信大家通過本系列文章更全面地理解了 TiDB Binlog 的原理和實現細節。我們將繼續打磨優化,歡迎大家給我們反饋使用過程中遇到的問題或建議;如果社區小夥伴們想參與 TiDB Binlog 的設計、開發和測試,也歡迎與我們聯繫 [email protected],或者在 Repo 中提 issue 討論。


分享到:


相關文章: