前言
初學 Zookeeper 會發現客戶端有兩種回調方式: Watcher 和 AsyncCallback,而 Zookeeper 的使用是離不開這兩種方式的,搞清楚它們之間的區別與實現顯得尤為重要。本文將圍繞下面幾個方面展開
- Watcher 和 AsyncCallback 的區別
- Watcher 的回調實現
- AsyncCallback 的回調實現
- IO 與事件處理
Watcher 和 AsyncCallback 的區別
我們先通過一個例子來感受一下:
zooKeeper.getData(root, new Watcher() {
public void process(WatchedEvent event) {
}
}, new AsyncCallback.DataCallback() {
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
}
}, null);
可以看到,getData方法可以同時設置兩個回調:Watcher 和 AsyncCallback,同樣是回調,它們的區別是什麼呢?要解決這個問題,我們就得從這兩個接口的功能入手。
- Watcher:Watcher是用於監聽節點,session 狀態的,比如getData對數據節點a設置了watcher,那麼當a的數據內容發生改變時,客戶端會收到NodeDataChanged通知,然後進行watcher的回調。
- AsyncCallback:AsyncCallback是在以異步方式使用 ZooKeeper API 時,用於處理返回結果的。例如:getData同步調用的版本是:byte[] getData(String path, boolean watch,Stat stat),異步調用的版本是:void getData(String path,Watcher watcher,AsyncCallback.DataCallback cb,Object ctx),可以看到,前者是直接返回獲取的結果,後者是通過AsyncCallback回調處理結果的。
Watcher
Watcher 主要是通過ClientWatchManager進行管理的。下面是 Watcher 相關類圖
WatcherClass
添加 Watcher 的流程如下:
添加Watcher
Watcher 的類型
ClientWatchManager中有四種Watcher
- defaultWatcher:創建Zookeeper連接時傳入的Watcher,用於監聽 session 狀態
- dataWatches:存放getData傳入的Watcher
- existWatches:存放exists傳入的Watcher,如果節點已存在,則Watcher會被添加到dataWatches
- childWatches:存放getChildren傳入的Watcher
從代碼上可以發現,監聽器是存在HashMap中的,key是節點名稱path,value是Set
private final Map> dataWatches =
new HashMap>();
private final Map> existWatches =
new HashMap>();
private final Map> childWatches =
new HashMap>();
private volatile Watcher defaultWatcher;
通知的狀態類型與事件類型
在Watcher接口中,已經定義了所有的狀態類型和事件類型
- KeeperState.Disconnected(0)
- 此時客戶端處於斷開連接狀態,和ZK集群都沒有建立連接。
- EventType.None(-1)
- 觸發條件:一般是在與服務器斷開連接的時候,客戶端會收到這個事件。
- KeeperState. SyncConnected(3)
- 此時客戶端處於連接狀態
- EventType.None(-1)
- 觸發條件:客戶端與服務器成功建立會話之後,會收到這個通知。
- EventType. NodeCreated (1)
- 觸發條件:所關注的節點被創建。
- EventType. NodeDeleted (2)
- 觸發條件:所關注的節點被刪除。
- EventType. NodeDataChanged (3)
- 觸發條件:所關注的節點的內容有更新。注意,這個地方說的內容是指數據的版本號dataVersion。因此,即使使用相同的數據內容來更新,還是會收到這個事件通知的。無論如何,調用了更新接口,就一定會更新dataVersion的。
- EventType. NodeChildrenChanged (4)
- 觸發條件:所關注的節點的子節點有變化。這裡說的變化是指子節點的個數和組成,具體到子節點內容的變化是不會通知的。
- KeeperState. AuthFailed(4)
- 認證失敗
- EventType.None(-1)
- KeeperState. Expired(-112)
- session 超時
- EventType.None(-1)
materialize 方法
ClientWatchManager只有一個方法,那就是materialize,它根據事件類型type和path返回監聽該節點的特定類型的Watcher。
public Setmaterialize(Watcher.Event.KeeperState state,
Watcher.Event.EventType type, String path);
核心邏輯如下:
- type == None:返回所有Watcher,也就是說所有的Watcher都會被觸發。如果disableAutoWatchReset == true且當前state != SyncConnected,那麼還會清空Watcher,意味著移除所有在節點上的Watcher。
- type == NodeDataChanged | NodeCreated:返回監聽path節點的dataWatches & existWatches
- type == NodeChildrenChanged:返回監聽path節點的childWatches
- type == NodeDeleted:返回監聽path節點的dataWatches | childWatches
每次返回都會從HashMap中移除節點對應的Watcher,例如:addTo(dataWatches.remove(clientPath), result);,這就是為什麼Watcher是一次性的原因(defaultWatcher除外)。值得注意的是,由於使用的是HashSet存儲Watcher,重複添加同一個實例的Watcher也只會被觸發一次。
AsyncCallback
Zookeeper 的exists,getData,getChildren方法都有異步的版本,它們與同步方法的區別僅僅在於是否等待響應,底層發送都是通過sendThread異步發送的。下面我們用一幅圖來說明:
上面的圖展示了同步/異步調用getData的流程,其他方法也是類似的。
IO 與事件處理
Zookeeper 客戶端會啟動兩個常駐線程
- SendThread:負責 IO 操作,包括髮送,接受響應,發送 ping 等。
- EventThread:負責處理事件,執行回調函數。
readResponse
readResponse是SendThread處理響應的核心函數,核心邏輯如下:
- 接受服務器的響應,並反序列化出ReplyHeader: 有一個單獨的線程SendThread,負責接收服務器端的響應。假設接受到的服務器傳遞過來的字節流是incomingBuffer,那麼就將這個incomingBuffer反序列化為ReplyHeader。
- 判斷響應類型:判斷ReplyHeader是Watcher響應還是AsyncCallback響應:ReplyHeader.getXid()存儲了響應類型。
- 如果是Watcher類型響應:從ReplyHeader中創建WatchedEvent,WatchedEvent裡面存儲了節點的路徑,然後去WatcherManager中找到和這個節點相關聯的所有Watcher,將他們寫入到EventThread的waitingEvents中。
- 如果是AsyncCallback類型響應:從ReplyHeader中讀取response,這個response描述了是Exists,setData,getData,getChildren,create.....中的哪一個異步回調。從pendingQueue中拿到Packet,Packet中的cb存儲了AsyncCallback,也就是異步 API 的結果回調。最後將Packet寫入到EventThread的waitingEvents中。
processEvent
processEvent是EventThread處理事件核心函數,核心邏輯如下:
- 如果event instanceof WatcherSetEventPair,取出pair中的Watchers,逐個調用watcher.process(pair.event)
- 否則event為AsyncCallback,根據p.response判斷為哪種響應類型,執行響應的回調processResult。
可見,Watcher和AsyncCallback都是由EventThread處理的,通過processEvent進行區分處理。
總結
Zookeeper 客戶端中Watcher和AsyncCallback都是異步回調的方式,但它們回調的時機是不一樣的,前者是由服務器發送事件觸發客戶端回調,後者是在執行了請求後得到響應後客戶端主動觸發的。它們的共同點在於都需要在獲取了服務器響應之後,由SendThread寫入EventThread的waitingEvents中,然後由EventThread逐個從事件隊列中獲取並處理。
喜歡的點個關注,點點贊,私信回覆‘666’有福利。
閱讀更多 JavaSpring高級進階 的文章