DBLE 心跳檢測模塊解析

熱愛技術、樂於分享的技術人,目前主要從事數據庫相關技術的研究。


概述

本文主要介紹 DBLE 心跳檢測模塊,內容包括心跳檢測作用及心跳檢測模塊源碼解析兩部分。


心跳檢測作用

DBLE 中心跳檢測的作用有以下三點:

1. 控制多個寫節點高可用切換;

2. 控制讀操作的負載均衡,會根據最近一次的心跳狀態,及主從延遲(如果配置了 slaveThreshold 主從延遲閾值的話)來控制讀負載均衡;

3. 控制空閒連接數大小,關閉多餘空閒連接。這裡發送的是 PING 包,需與 dataNodeIdleCheckPeriod 參數配合,超過此參數的空閒連接會通過發送 PING 包來檢查。

總的來講,就是判斷 MySQL 實例的狀態。

本文中主要講解前兩點涉及到的心跳檢測內容,第 3 點更適合在連接管理中講,本文暫不涉及。


心跳模塊源碼解析

心跳檢測定時任務開始入口在 Scheduler#init 方法中,以 dataNodeHeartbeatPeriod 間隔定期進行心跳檢測,默認值為 10 秒:

<code>scheduler.scheduleAtFixedRate(dataSourceHeartbeat(), 0L, system.getDataNodeHeartbeatPeriod(), TimeUnit.MILLISECONDS);/<code>

Scheduler#dataSourceHeartbeat 方法返回 Runnable 任務:

<code>private Runnable dataSourceHeartbeat() {        return new Runnable() {            @Override            public void run() {                timerExecutor.execute(new Runnable() {                    @Override                    public void run() {                    //這裡有個判斷,如果讀寫節點都沒有,自然不需要心跳檢測了                        if (!DbleServer.getInstance().getConfig().isDataHostWithoutWR()) {                            Map<string> hosts = DbleServer.getInstance().getConfig().getDataHosts();                            for (AbstractPhysicalDBPool host : hosts.values()) {                            //調用了AbstractPhysicalDBPool的doHeartbeat()方法                                host.doHeartbeat();                            }                        }                    }                });            }        };    }/<string>/<code>

AbstractPhysicalDBPool#doHeartbeat 為抽象方法,有兩個實現分別在類 PhysicalDNPoolSingleWH 和 PhysicalDBPool 中,這兩個類的區別從名字就可以看出來,一個是隻有一個 WriteHost,另一個則有多個 WriteHost,會根據你們的 schema.xml 中的具體配置決定初始化哪一個。

對於心跳檢測來說,基本實現都一樣,所以看哪一個類並不影響。

我們就來看下 PhysicalDNPoolSingleWH#doHeartbeat 方法吧:

<code>public void doHeartbeat() {        for (PhysicalDatasource source : allSourceMap.values()) {            if (source != null) {                source.doHeartbeat();            } else {                LOGGER.warn(hostName + " current dataSource is null!");            }        }    }/<code>

上述方法其實就是循環遍歷所有數據源,然後對每個數據源進行心跳檢測了。

繼續來看 PhysicalDatasource#doHeartbeat 方法,補充說明一下, PhysicalDatasource 也是抽象類,但在 DBLE 中只有 MySQLDatasource 一個實現,因為 DBLE 後端只支持 MySQL 嘛, MySQLDatasource#doHeartbeat 方法也是直接繼承了抽象類的實現:

<code>public void doHeartbeat() {        if (TimeUtil.currentTimeMillis() < heartbeatRecoveryTime) {            return;        }        if (!heartbeat.isStop()) {            //這裡直接調用了MySQLHeartbeat#heartbeat方法            heartbeat.heartbeat();        }    }/<code> 

再繼續看 MySQLHeartbeat#heartbeat 方法前,先來看下 MySQLDatasource 和 MySQLHeartbeat類之間的關係:


分佈式 | DBLE 心跳檢測模塊解析


它們之間的關係很簡單,就是 MySQLDatasource 會創建 MySQLHeartbeat,並且它們之間有一對一的關聯關係。

簡單來說就是一個 MySQLDatasource 對象就有一個 MySQLHeartbeat 對象來負責它的心跳檢測。

進一步來看 MySQLHeartbeat#heartbeat 方法:

<code>public void heartbeat() {        final ReentrantLock reentrantLock = this.lock;        reentrantLock.lock();        try {            if (isChecking.compareAndSet(false, true)) {                if (detector == null || detector.isQuit()) {                    try {                        detector = new MySQLDetector(this);                        detector.heartbeat();                    } catch (Exception e) {                        LOGGER.info(source.getConfig().toString(), e);                        setResult(ERROR_STATUS);                    }                } else {                    detector.heartbeat();                }            } else {                if (detector != null) {                    if (detector.isQuit()) {                        isChecking.compareAndSet(true, false);                    } else if (detector.isHeartbeatTimeout()) {                        setResult(TIMEOUT_STATUS);                    }                }            }        } finally {            reentrantLock.unlock();        }    }/<code>

上述方法主要是調用了 MySQLDetector#heartbeat 方法,調用鏈真的挺深的……:

<code>public void heartbeat() {        if (con == null || con.isClosed()) {            heartbeat.setResult(MySQLHeartbeat.ERROR_STATUS);            return;        }        //設置了發送心跳檢測的時間        lastSendQryTime = System.currentTimeMillis();        String[] fetchCols = {};        if (heartbeat.getSource().getHostConfig().isShowSlaveSql()) {            fetchCols = MYSQL_SLAVE_STATUS_COLS;        } else if (heartbeat.getSource().getHostConfig().isShowClusterSql()) {            fetchCols = MYSQL_CLUSTER_STATUS_COLS;        } else if (heartbeat.getSource().getHostConfig().isSelectReadOnlySql()) {            fetchCols = MYSQL_READ_ONLY_COLS;        }        if (LOGGER.isDebugEnabled()) {            LOGGER.debug("do heartbeat,conn is " + con);        }        OneRawSQLQueryResultHandler resultHandler = new OneRawSQLQueryResultHandler(fetchCols, this);        sqlJob = new HeartbeatSQLJob(heartbeat.getHeartbeatSQL(), con, resultHandler);        //執行心跳檢測任務        sqlJob.execute();    }/<code>

簡單說下該方法,該方法會根據你配置的心跳語句,實際執行檢測後端 MySQL 狀態,並存儲相應的數據,這裡涉及到了異步調用,檢測完成後將會回調 MySQLDetector#onResult 方法:

<code>public void onResult(SQLQueryResult> result) {        //設置心跳檢測完成後的時間        lastReceivedQryTime = System.currentTimeMillis();        heartbeat.getRecorder().set((lastReceivedQryTime - lastSendQryTime));        if (result.isSuccess()) {            PhysicalDatasource source = heartbeat.getSource();            Map<string> resultResult = result.getResult();            if (source.getHostConfig().isShowSlaveSql()) {                setStatusBySlave(source, resultResult);            } else if (source.getHostConfig().isShowClusterSql()) {                setStatusByCluster(resultResult);            } else if (source.getHostConfig().isSelectReadOnlySql()) {                setStatusByReadOnly(source, resultResult);            } else {                setStatusForNormalHeartbeat(source);            }        } else {            heartbeat.setResult(MySQLHeartbeat.ERROR_STATUS);        }    }/<string>/<code>

上述方法就是根據心跳檢測結果,來設置 MySQLHeartbeat 類中表示心跳狀態的各個變量了,比如 status變量, slaveBehindMaster 主從延遲時間變量。

上述整個過程就完成了數據源的心跳檢測,關於檢測結果的使用主要通過 MySQLHeartbeat 類中的 getStatus 和 getSlaveBehindMaster 方法,通過這兩個方法來判斷心跳是否成功,以及主從延遲多少,進而影響

數據源切換及讀寫分離邏輯,分別對應心跳檢測作用的第 1、2 點。


總結

本文主要講解了 DBLE 心跳檢測模塊,包括心跳檢測作用以及相應源碼解析,希望本文能幫助大家進一步理解心跳檢測模塊。


分享到:


相關文章: