Java大牛解析ZooKeeper Watcher 和 AsyncCallback 的區別與實現

前言

初學 Zookeeper 會發現客戶端有兩種回調方式: Watcher 和 AsyncCallback,而 Zookeeper 的使用是離不開這兩種方式的,搞清楚它們之間的區別與實現顯得尤為重要。本文將圍繞下面幾個方面展開

  • Watcher 和 AsyncCallback 的區別
  • Watcher 的回調實現
  • AsyncCallback 的回調實現
  • IO 與事件處理

Watcher 和 AsyncCallback 的區別

我們先通過一個例子來感受一下:

zooKeeper.getData(root, new Watcher() {
public void process(WatchedEvent event) {
}
}, new AsyncCallback.DataCallback() {
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
}
}, null);

可以看到,getData方法可以同時設置兩個回調:Watcher 和 AsyncCallback,同樣是回調,它們的區別是什麼呢?要解決這個問題,我們就得從這兩個接口的功能入手。

  • Watcher:Watcher是用於監聽節點,session 狀態的,比如getData對數據節點a設置了watcher,那麼當a的數據內容發生改變時,客戶端會收到NodeDataChanged通知,然後進行watcher的回調。
  • AsyncCallback:AsyncCallback是在以異步方式使用 ZooKeeper API 時,用於處理返回結果的。例如:getData同步調用的版本是:byte[] getData(String path, boolean watch,Stat stat),異步調用的版本是:void getData(String path,Watcher watcher,AsyncCallback.DataCallback cb,Object ctx),可以看到,前者是直接返回獲取的結果,後者是通過AsyncCallback回調處理結果的。

Watcher

Watcher 主要是通過ClientWatchManager進行管理的。下面是 Watcher 相關類圖

Java大牛解析ZooKeeper Watcher 和 AsyncCallback 的區別與實現

WatcherClass

添加 Watcher 的流程如下:

Java大牛解析ZooKeeper Watcher 和 AsyncCallback 的區別與實現

添加Watcher

Watcher 的類型

ClientWatchManager中有四種Watcher

  • defaultWatcher:創建Zookeeper連接時傳入的Watcher,用於監聽 session 狀態
  • dataWatches:存放getData傳入的Watcher
  • existWatches:存放exists傳入的Watcher,如果節點已存在,則Watcher會被添加到dataWatches
  • childWatches:存放getChildren傳入的Watcher

從代碼上可以發現,監聽器是存在HashMap中的,key是節點名稱path,value是Set

private final Map> dataWatches =
new HashMap>();
private final Map> existWatches =
new HashMap>();
private final Map> childWatches =
new HashMap>();
private volatile Watcher defaultWatcher;

通知的狀態類型與事件類型

在Watcher接口中,已經定義了所有的狀態類型和事件類型

  • KeeperState.Disconnected(0)
  • 此時客戶端處於斷開連接狀態,和ZK集群都沒有建立連接。
  • EventType.None(-1)
  • 觸發條件:一般是在與服務器斷開連接的時候,客戶端會收到這個事件。
  • KeeperState. SyncConnected(3)
  • 此時客戶端處於連接狀態
  • EventType.None(-1)
  • 觸發條件:客戶端與服務器成功建立會話之後,會收到這個通知。
  • EventType. NodeCreated (1)
  • 觸發條件:所關注的節點被創建。
  • EventType. NodeDeleted (2)
  • 觸發條件:所關注的節點被刪除。
  • EventType. NodeDataChanged (3)
  • 觸發條件:所關注的節點的內容有更新。注意,這個地方說的內容是指數據的版本號dataVersion。因此,即使使用相同的數據內容來更新,還是會收到這個事件通知的。無論如何,調用了更新接口,就一定會更新dataVersion的。
  • EventType. NodeChildrenChanged (4)
  • 觸發條件:所關注的節點的子節點有變化。這裡說的變化是指子節點的個數和組成,具體到子節點內容的變化是不會通知的。
  • KeeperState. AuthFailed(4)
  • 認證失敗
  • EventType.None(-1)
  • KeeperState. Expired(-112)
  • session 超時
  • EventType.None(-1)

materialize 方法

ClientWatchManager只有一個方法,那就是materialize,它根據事件類型type和path返回監聽該節點的特定類型的Watcher。

public Set materialize(Watcher.Event.KeeperState state,
Watcher.Event.EventType type, String path);

核心邏輯如下:

  1. type == None:返回所有Watcher,也就是說所有的Watcher都會被觸發。如果disableAutoWatchReset == true且當前state != SyncConnected,那麼還會清空Watcher,意味著移除所有在節點上的Watcher。
  2. type == NodeDataChanged | NodeCreated:返回監聽path節點的dataWatches & existWatches
  3. type == NodeChildrenChanged:返回監聽path節點的childWatches
  4. type == NodeDeleted:返回監聽path節點的dataWatches | childWatches

每次返回都會從HashMap中移除節點對應的Watcher,例如:addTo(dataWatches.remove(clientPath), result);,這就是為什麼Watcher是一次性的原因(defaultWatcher除外)。值得注意的是,由於使用的是HashSet存儲Watcher,重複添加同一個實例的Watcher也只會被觸發一次。

AsyncCallback

Zookeeper 的exists,getData,getChildren方法都有異步的版本,它們與同步方法的區別僅僅在於是否等待響應,底層發送都是通過sendThread異步發送的。下面我們用一幅圖來說明:

Java大牛解析ZooKeeper Watcher 和 AsyncCallback 的區別與實現

上面的圖展示了同步/異步調用getData的流程,其他方法也是類似的。

IO 與事件處理

Zookeeper 客戶端會啟動兩個常駐線程

  • SendThread:負責 IO 操作,包括髮送,接受響應,發送 ping 等。
  • EventThread:負責處理事件,執行回調函數。
Java大牛解析ZooKeeper Watcher 和 AsyncCallback 的區別與實現

readResponse

readResponse是SendThread處理響應的核心函數,核心邏輯如下:

  1. 接受服務器的響應,並反序列化出ReplyHeader: 有一個單獨的線程SendThread,負責接收服務器端的響應。假設接受到的服務器傳遞過來的字節流是incomingBuffer,那麼就將這個incomingBuffer反序列化為ReplyHeader。
  2. 判斷響應類型:判斷ReplyHeader是Watcher響應還是AsyncCallback響應:ReplyHeader.getXid()存儲了響應類型。
  3. 如果是Watcher類型響應:從ReplyHeader中創建WatchedEvent,WatchedEvent裡面存儲了節點的路徑,然後去WatcherManager中找到和這個節點相關聯的所有Watcher,將他們寫入到EventThread的waitingEvents中。
  4. 如果是AsyncCallback類型響應:從ReplyHeader中讀取response,這個response描述了是Exists,setData,getData,getChildren,create.....中的哪一個異步回調。從pendingQueue中拿到Packet,Packet中的cb存儲了AsyncCallback,也就是異步 API 的結果回調。最後將Packet寫入到EventThread的waitingEvents中。

processEvent

processEvent是EventThread處理事件核心函數,核心邏輯如下:

  1. 如果event instanceof WatcherSetEventPair,取出pair中的Watchers,逐個調用watcher.process(pair.event)
  2. 否則event為AsyncCallback,根據p.response判斷為哪種響應類型,執行響應的回調processResult。

可見,Watcher和AsyncCallback都是由EventThread處理的,通過processEvent進行區分處理。

總結

Zookeeper 客戶端中Watcher和AsyncCallback都是異步回調的方式,但它們回調的時機是不一樣的,前者是由服務器發送事件觸發客戶端回調,後者是在執行了請求後得到響應後客戶端主動觸發的。它們的共同點在於都需要在獲取了服務器響應之後,由SendThread寫入EventThread的waitingEvents中,然後由EventThread逐個從事件隊列中獲取並處理。

喜歡的點個關注,點點贊,私信回覆‘666’有福利。


分享到:


相關文章: