基於 Canal 和 Kafka 實現 MySQL 的 Binlog 實時同步

前言:

近段時間,業務系統架構基本完備,數據層面的建設比較薄弱,因為小楊目前工作重心在於搭建一個小型的數據平臺。優先級比較高的一個任務就是需要近實時同步業務系統的數據(包括保存、更新或者軟刪除)到一個另一個數據源,持久化之前需要清洗數據並且構建一個相對合理的便於後續業務數據統計、標籤系統構建等擴展功能的數據模型。基於當前團隊的資源和能力,優先調研了Alibaba開源中間件Canal的使用。

基於 Canal 和 Kafka 實現 MySQL 的 Binlog 實時同步

這篇文章簡單介紹一下如何快速地搭建一套Canal相關的組件。

關於Canal

簡介

下面的簡介和下一節的原理均來自於Canal項目的README:

基於 Canal 和 Kafka 實現 MySQL 的 Binlog 實時同步

Canal[kə'næl],譯意為水道/管道/溝渠,主要用途是基於MySQL數據庫增量日誌解析,提供增量數據訂閱和消費。早期阿里巴巴因為杭州和美國雙機房部署,存在跨機房同步的業務需求,實現方式主要是基於業務trigger獲取增量變更。從 2010 年開始,業務逐步嘗試數據庫日誌解析獲取增量變更進行同步,由此衍生出了大量的數據庫增量訂閱和消費業務。

基於日誌增量訂閱和消費的業務包括:

  • 數據庫鏡像
  • 數據庫實時備份
  • 索引構建和實時維護(拆分異構索引、倒排索引等)
  • 業務Cache刷新
  • 帶業務邏輯的增量數據處理

Canal 的工作原理

MySQL主備複製原理:

基於 Canal 和 Kafka 實現 MySQL 的 Binlog 實時同步

  • MySQL的Master實例將數據變更寫入二進制日誌(binary log,其中記錄叫做二進制日誌事件binary log events,可以通過show binlog events進行查看)
  • MySQL的Slave實例將master的binary log events拷貝到它的中繼日誌(relay log)
  • MySQL的Slave實例重放relay log中的事件,將數據變更反映它到自身的數據

Canal的工作原理如下:

  • Canal模擬MySQL Slave的交互協議,偽裝自己為MySQL Slave,向MySQL Master發送dump協議
  • MySQL Master收到dump請求,開始推送binary log給Slave(即Canal)
  • Canal解析binary log對象(原始為byte流),並且可以通過連接器發送到對應的消息隊列等中間件中

關於Canal的版本和部件

截止筆者開始編寫本文的時候(2020-03-05),Canal的最新發布版本是v1.1.5-alpha-1(2019-10-09發佈的),最新的正式版是v1.1.4(2019-09-02發佈的)。其中,v1.1.4主要添加了鑑權、監控的功能,並且做了一些列的性能優化,此版本集成的連接器是Tcp、Kafka和RockerMQ。而v1.1.5-alpha-1版本已經新增了RabbitMQ連接器,但是此版本的RabbitMQ連接器暫時不能定義連接RabbitMQ的端口號,不過此問題已經在master分支中修復(具體可以參看源碼中的CanalRabbitMQProducer類的提交記錄)。換言之,v1.1.4版本中目前能使用的內置連接器只有Tcp、Kafka和RockerMQ三種,如果想嚐鮮使用RabbitMQ連接器,可以選用下面的兩種方式之一:

  • 選用v1.1.5-alpha-1版本,但是無法修改RabbitMQ的port屬性,默認為5672。
  • 基於master分支自行構建Canal。

目前,Canal項目的活躍度比較高,但是考慮到功能的穩定性問題,筆者建議選用穩定版本在生產環境中實施,當前可以選用v1.1.4版本,本文的例子用選用的就是v1.1.4版本,配合Kafka連接器使用。Canal主要包括三個核心部件:

  • canal-admin:後臺管理模塊,提供面向WebUI的Canal管理能力。
  • canal-adapter:適配器,增加客戶端數據落地的適配及啟動功能,包括REST、日誌適配器、關係型數據庫的數據同步(表對錶同步)、HBase數據同步、ES數據同步等等。
  • canal-deployer:發佈器,核心功能所在,包括binlog解析、轉換和發送報文到連接器中等等功能都由此模塊提供。

一般情況下,canal-deployer部件是必須的,其他兩個部件按需選用即可。

部署所需的中間件

搭建一套可以用的組件需要部署MySQL、Zookeeper、Kafka和Canal四個中間件的實例,下面簡單分析一下部署過程。選用的虛擬機系統是CentOS7。

安裝MySQL

為了簡單起見,選用yum源安裝(官方鏈接是http://dev.mysql.com/downloads/repo/yum):

基於 Canal 和 Kafka 實現 MySQL 的 Binlog 實時同步

mysql80-community-release-el7-3雖然包名帶了mysql80關鍵字,其實已經集成了MySQL主流版本5.6、5.7和8.x等等的最新安裝包倉庫

選用的是最新版的MySQL8.x社區版,下載CentOS7適用的rpm包:

<code>cd /data/mysql
wget http://dev.mysql.com/get/mysql80-community-release-el7-3.noarch.rpm
// 下載完畢之後

sudo rpm -Uvh mysql80-community-release-el7-3.noarch.rpm/<code>

此時列舉一下yum倉庫裡面的MySQL相關的包:

<code>[root@localhost mysql]# yum repolist all | grep mysql
mysql-cluster-7.5-community/x86_64 MySQL Cluster 7.5 Community   disabled
mysql-cluster-7.5-community-source MySQL Cluster 7.5 Community - disabled
mysql-cluster-7.6-community/x86_64 MySQL Cluster 7.6 Community   disabled
mysql-cluster-7.6-community-source MySQL Cluster 7.6 Community - disabled
mysql-cluster-8.0-community/x86_64 MySQL Cluster 8.0 Community   disabled
mysql-cluster-8.0-community-source MySQL Cluster 8.0 Community - disabled
mysql-connectors-community/x86_64  MySQL Connectors Community    enabled:    141
mysql-connectors-community-source  MySQL Connectors Community -  disabled
mysql-tools-community/x86_64       MySQL Tools Community         enabled:    105
mysql-tools-community-source       MySQL Tools Community - Sourc disabled
mysql-tools-preview/x86_64         MySQL Tools Preview           disabled
mysql-tools-preview-source         MySQL Tools Preview - Source  disabled
mysql55-community/x86_64           MySQL 5.5 Community Server    disabled
mysql55-community-source           MySQL 5.5 Community Server -  disabled
mysql56-community/x86_64           MySQL 5.6 Community Server    disabled
mysql56-community-source           MySQL 5.6 Community Server -  disabled
mysql57-community/x86_64           MySQL 5.7 Community Server    disabled
mysql57-community-source           MySQL 5.7 Community Server -  disabled
mysql80-community/x86_64           MySQL 8.0 Community Server    enabled:    161
mysql80-community-source           MySQL 8.0 Community Server -  disabled/<code>

編輯/etc/yum.repos.d/mysql-community.repo文件([mysql80-community]塊中enabled設置為1,其實默認就是這樣子,不用改,如果要選用5.x版本則需要修改對應的塊):

<code>[mysql80-community]
name=MySQL 8.0 Community Server
baseurl=http://repo.mysql.com/yum/mysql-8.0-community/el/7/$basearch/
enabled=1
gpgcheck=1
gpgkey=file:///etc/pki/rpm-gpg/RPM-GPG-KEY-mysql/<code>

然後安裝MySQL服務:

<code>sudo yum install mysql-community-server/<code>

這個過程比較漫長,因為需要下載和安裝5個rpm安裝包(或者是所有安裝包組合的壓縮包mysql-8.0.18-1.el7.x86_64.rpm-bundle.tar)。如果網絡比較差,也可以直接從官網手動下載後安裝:

基於 Canal 和 Kafka 實現 MySQL 的 Binlog 實時同步

<code>// 下載下面5個rpm包 common --> libs --> libs-compat --> client --> server
mysql-community-common
mysql-community-libs
mysql-community-libs-compat
mysql-community-client
mysql-community-server

// 強制安裝
rpm -ivh mysql-community-common-8.0.18-1.el7.x86_64.rpm --force --nodeps
rpm -ivh mysql-community-libs-8.0.18-1.el7.x86_64.rpm --force --nodeps
rpm -ivh mysql-community-libs-compat-8.0.18-1.el7.x86_64.rpm --force --nodeps
rpm -ivh mysql-community-client-8.0.18-1.el7.x86_64.rpm --force --nodeps
rpm -ivh mysql-community-server-8.0.18-1.el7.x86_64.rpm --force --nodeps/<code>

安裝完畢之後,啟動MySQL服務,然後搜索MySQL服務的root賬號的臨時密碼用於首次登陸(mysql -u root -p):

<code>// 啟動服務,關閉服務就是service mysqld stop
service mysqld start
// 查看臨時密碼 cat /var/log/mysqld.log
[root@localhost log]# cat /var/log/mysqld.log 
2020-03-02T06:03:53.996423Z 0 [System] [MY-013169] [Server] /usr/sbin/mysqld (mysqld 8.0.18) initializing of server in progress as process 22780
2020-03-02T06:03:57.321447Z 5 [Note] [MY-010454] [Server] A temporary password is generated for root@localhost: >kjYaXENK6li
2020-03-02T06:04:00.123845Z 0 [System] [MY-010116] [Server] /usr/sbin/mysqld (mysqld 8.0.18) starting as process 22834
// 登錄臨時root用戶,使用臨時密碼
[root@localhost log]# mysql -u root -p/<code>

接下來做下面的操作:

  • 修改root用戶的密碼:ALTER USER 'root'@'localhost' IDENTIFIED BY 'QWqw12!@';(注意密碼規則必須包含大小寫字母、數字和特殊字符)
  • 更新root的host,切換數據庫use mysql;,指定host為%以便可以讓其他服務器遠程訪問UPDATE USER SET HOST = '%' WHERE USER = 'root';
  • 賦予'root'@'%'用戶,所有權限,執行GRANT ALL PRIVILEGES ON . TO 'root'@'%';
  • 改變root'@'%用戶的密碼校驗規則以便可以使用Navicat等工具訪問:ALTER USER 'root'@'%' IDENTIFIED WITH mysql_native_password BY 'QWqw12!@';
基於 Canal 和 Kafka 實現 MySQL 的 Binlog 實時同步

操作完成之後,就可以使用root用戶遠程訪問此虛擬機上的MySQL服務。最後確認是否開啟了binlog(注意一點是MySQL8.x默認開啟binlog)SHOW VARIABLES LIKE '%bin%';:

基於 Canal 和 Kafka 實現 MySQL 的 Binlog 實時同步

最後在MySQL的Shell執行下面的命令,新建一個用戶名canal密碼為QWqw12!@的新用戶,賦予REPLICATION SLAVE和 REPLICATION CLIENT權限:

<code>CREATE USER canal IDENTIFIED BY 'QWqw12!@';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'QWqw12!@';/<code>

切換回去root用戶,創建一個數據庫test:

<code>CREATE DATABASE `test` CHARSET `utf8mb4` COLLATE `utf8mb4_unicode_ci`;/<code>

安裝Zookeeper

Canal和Kafka集群都依賴於Zookeeper做服務協調,為了方便管理,一般會獨立部署Zookeeper服務或者Zookeeper集群。筆者這裡選用2020-03-04發佈的3.6.0版本:

<code>midkr /data/zk
# 創建數據目錄
midkr /data/zk/data
cd /data/zk
wget http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.6.0/apache-zookeeper-3.6.0-bin.tar.gz
tar -zxvf apache-zookeeper-3.6.0-bin.tar.gz
cd apache-zookeeper-3.6.0-bin/conf
cp zoo_sample.cfg zoo.cfg && vim zoo.cfg/<code>

把zoo.cfg文件中的dataDir設置為/data/zk/data,然後啟動Zookeeper:

<code>[root@localhost conf]# sh /data/zk/apache-zookeeper-3.6.0-bin/bin/zkServer.sh start
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /data/zk/apache-zookeeper-3.6.0-bin/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED/<code>

這裡注意一點,要啟動此版本的Zookeeper服務必須本地安裝好JDK8+,這一點需要自行處理。啟動的默認端口是2181,啟動成功後的日誌如下:

基於 Canal 和 Kafka 實現 MySQL 的 Binlog 實時同步

安裝Kafka

Kafka是一個高性能分佈式消息隊列中間件,它的部署依賴於Zookeeper。筆者在此選用2.4.0並且Scala版本為2.13的安裝包:

<code>mkdir /data/kafka
mkdir /data/kafka/data
wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.4.0/kafka_2.13-2.4.0.tgz
tar -zxvf kafka_2.13-2.4.0.tgz/<code>

由於解壓後/data/kafka/kafka_2.13-2.4.0/config/server.properties配置中對應的zookeeper.connect=localhost:2181已經符合需要,不必修改,需要修改日誌文件的目錄log.dirs為/data/kafka/data。然後啟動Kafka服務:

<code>sh /data/kafka/kafka_2.13-2.4.0/bin/kafka-server-start.sh /data/kafka/kafka_2.13-2.4.0/config/server.properties/<code>
基於 Canal 和 Kafka 實現 MySQL 的 Binlog 實時同步

這樣啟動一旦退出控制檯就會結束Kafka進程,可以添加-daemon參數用於控制Kafka進程後臺不掛斷運行。

<code>sh /data/kafka/kafka_2.13-2.4.0/bin/kafka-server-start.sh -daemon /data/kafka/kafka_2.13-2.4.0/config/server.properties/<code>

安裝和使用Canal

終於到了主角登場,這裡選用Canal的v1.1.4穩定發佈版,只需要下載deployer模塊:

<code>mkdir /data/canal
cd /data/canal
# 這裡注意一點,Github在國內被牆,下載速度極慢,可以先用其他下載工具下載完再上傳到服務器中
wget http://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz
tar -zxvf canal.deployer-1.1.4.tar.gz/<code>

解壓後的目錄如下:

<code>- bin   # 運維腳本
- conf  # 配置文件
  canal_local.properties  # canal本地配置,一般不需要動
  canal.properties        # canal服務配置
  logback.xml             # logback日誌配置
  metrics                 # 度量統計配置
  spring                  # spring-實例配置,主要和binlog位置計算、一些策略配置相關,可以在canal.properties選用其中的任意一個配置文件
  example                 # 實例配置文件夾,一般認為單個數據庫對應一個獨立的實例配置文件夾
    instance.properties   # 實例配置,一般指單個數據庫的配置

- lib   # 服務依賴包
- logs  # 日誌文件輸出目錄/<code>

在開發和測試環境建議把logback.xml的日誌級別修改為DEBUG方便定位問題。這裡需要關注canal.properties和instance.properties兩個配置文件。canal.properties文件中,需要修改:

  • 去掉canal.instance.parser.parallelThreadSize = 16這個配置項的註釋,也就是啟用此配置項,和實例解析器的線程數相關,不配置會表現為阻塞或者不進行解析。
  • canal.serverMode配置項指定為kafka,可選值有tcp、kafka和rocketmq(master分支或者最新的的v1.1.5-alpha-1版本,可以選用rabbitmq),默認是kafka。
  • canal.mq.servers配置需要指定為Kafka服務或者集群Broker的地址,這裡配置為127.0.0.1:9092。

canal.mq.servers在不同的canal.serverMode有不同的意義。kafka模式下,指Kafka服務或者集群Broker的地址,也就是bootstrap.serversrocketmq模式下,指NameServer列表rabbitmq模式下,指RabbitMQ服務的Host和Port

其他配置項可以參考下面兩個官方Wiki的鏈接:

  • Canal-Kafka-RocketMQ-QuickStart
  • AdminGuide

instance.properties一般指一個數據庫實例的配置,Canal架構支持一個Canal服務實例,處理多個數據庫實例的binlog異步解析。instance.properties需要修改的配置項主要包括:

  • canal.instance.mysql.slaveId需要配置一個和Master節點的服務ID完全不同的值,這裡筆者配置為654321。
  • 配置數據源實例,包括地址、用戶、密碼和目標數據庫:
    • canal.instance.master.address,這裡指定為127.0.0.1:3306。
    • canal.instance.dbUsername,這裡指定為canal。
    • canal.instance.dbPassword,這裡指定為QWqw12!@。
    • 新增canal.instance.defaultDatabaseName,這裡指定為test(需要在MySQL中建立一個test數據庫,見前面的流程)。
  • Kafka相關配置,這裡暫時使用靜態topic和單個partition:
    • canal.mq.topic,這裡指定為test,也就是解析完的binlog結構化數據會發送到Kafka的命名為test的topic中。
    • canal.mq.partition,這裡指定為0。

配置工作做好之後,可以啟動Canal服務:

<code>sh /data/canal/bin/startup.sh 
# 查看服務日誌
tail -100f /data/canal/logs/canal/canal
# 查看實例日誌  -- 一般情況下,關注實例日誌即可

tail -100f /data/canal/logs/example/example.log/<code>

啟動正常後,見實例日誌如下:

基於 Canal 和 Kafka 實現 MySQL 的 Binlog 實時同步

在test數據庫創建一個訂單表,並且執行幾個簡單的DML:

<code>use `test`;

CREATE TABLE `order`
(
    id          BIGINT UNIQUE PRIMARY KEY AUTO_INCREMENT COMMENT '主鍵',
    order_id    VARCHAR(64)    NOT NULL COMMENT '訂單ID',
    amount      DECIMAL(10, 2) NOT NULL DEFAULT 0 COMMENT '訂單金額',
    create_time DATETIME       NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '創建時間',
    UNIQUE uniq_order_id (`order_id`)
) COMMENT '訂單表';


INSERT INTO `order`(order_id, amount) VALUES ('10086', 999);
UPDATE `order` SET amount = 10087 WHERE order_id = '10086';
DELETE  FROM `order` WHERE order_id = '10086';/<code>

這個時候,可以利用Kafka的kafka-console-consumer或者Kafka Tools查看test這個topic的數據:

<code>sh /data/kafka/kafka_2.13-2.4.0/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --from-beginning --topic test/<code>
基於 Canal 和 Kafka 實現 MySQL 的 Binlog 實時同步

具體的數據如下:

<code>// test數據庫建庫腳本
{"data":null,"database":"`test`","es":1583143732000,"id":1,"isDdl":false,"mysqlType":null,"old":null,"pkNames":null,"sql":"CREATE DATABASE `test` CHARSET `utf8mb4` COLLATE `utf8mb4_unicode_ci`","sqlType":null,"table":"","ts":1583143930177,"type":"QUERY"}

// order表建表DDL
{"data":null,"database":"test","es":1583143957000,"id":2,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"CREATE TABLE `order`\\n(\\n    id          BIGINT UNIQUE PRIMARY KEY AUTO_INCREMENT COMMENT '主鍵',\\n    order_id    VARCHAR(64)    NOT NULL COMMENT '訂單ID',\\n    amount      DECIMAL(10, 2) NOT NULL DEFAULT 0 COMMENT '訂單金額',\\n    create_time DATETIME       NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '創建時間',\\n    UNIQUE uniq_order_id (`order_id`)\\n) COMMENT '訂單表'","sqlType":null,"table":"order","ts":1583143958045,"type":"CREATE"}


// INSERT
{"data":[{"id":"1","order_id":"10086","amount":"999.0","create_time":"2020-03-02 05:12:49"}],"database":"test","es":1583143969000,"id":3,"isDdl":false,"mysqlType":{"id":"BIGINT","order_id":"VARCHAR(64)","amount":"DECIMAL(10,2)","create_time":"DATETIME"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":-5,"order_id":12,"amount":3,"create_time":93},"table":"order","ts":1583143969460,"type":"INSERT"}

// UPDATE
{"data":[{"id":"1","order_id":"10086","amount":"10087.0","create_time":"2020-03-02 05:12:49"}],"database":"test","es":1583143974000,"id":4,"isDdl":false,"mysqlType":{"id":"BIGINT","order_id":"VARCHAR(64)","amount":"DECIMAL(10,2)","create_time":"DATETIME"},"old":[{"amount":"999.0"}],"pkNames":["id"],"sql":"","sqlType":{"id":-5,"order_id":12,"amount":3,"create_time":93},"table":"order","ts":1583143974870,"type":"UPDATE"}

// DELETE
{"data":[{"id":"1","order_id":"10086","amount":"10087.0","create_time":"2020-03-02 05:12:49"}],"database":"test","es":1583143980000,"id":5,"isDdl":false,"mysqlType":{"id":"BIGINT","order_id":"VARCHAR(64)","amount":"DECIMAL(10,2)","create_time":"DATETIME"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":-5,"order_id":12,"amount":3,"create_time":93},"table":"order","ts":1583143981091,"type":"DELETE"}/<code>

可見Kafka的名為test的topic已經寫入了對應的結構化binlog事件數據,可以編寫消費者監聽Kafka對應的topic然後對獲取到的數據進行後續處理。

總結:

這篇文章大部分篇幅用於介紹其他中間件是怎麼部署的,這個問題側面說明了Canal本身部署並不複雜,它的配置文件屬性項比較多,但是實際上需要自定義和改動的配置項是比較少的,也就是說明了它的運維成本和學習成本並不高。後面會分析基於結構化binlog事件做ELT和持久化相關工作以及Canal的生產環境可用級別HA集群的搭建。


分享到:


相關文章: