codis 源碼理解

codis 源碼理解

這裡介紹一下 codis 幾個主要的點,對理解源碼有幫助。

1. 先看 ServerGroup 和 Slot。

一個 Proxy 可以對應多個 ServerGroup;

ServerGroup 是一組 Codis Server,一個 ServerGroup 只有一個 Master(Codis Server),而且雖然有多個 Codis Server,Proxy 只訪問 ServerGroup 中的 Master,Slave 可用作故障切換;

Slot 是一個邏輯概念,一共 1024 個,使用 crc32(key) % 1024 計算 Slot id,而且一個(或多個) Slot 屬於一個 ServerGroup,1024 個 Slot 一起分用多個 ServerGroup 資源;

當 Slot 所在 ServerGroup 內存不夠的時候可以把此 Slot 遷移到另一個內存使用少的 ServerGroup,實現擴容的目的,而當所有 ServerGroup 內存不夠的時候增加新的 ServerGroup 即可。

ServerGroup 數據結構:

codis 源碼理解

Slot 數據結構:

codis 源碼理解
codis 源碼理解

2. 再看看 Router,Router 用來轉發 linstener 接收的 Codis 請求。

codis 源碼理解

最核心的是 SharedBackendConn 的數據結構:

codis 源碼理解
codis 源碼理解

再看看這裡 Slot 的數據結構:

codis 源碼理解

每一個 ServerGroup 有一個 Master (Codis Server),bc 只能是 Master (的連接)。

Proxy 啟動的時候,會去 fill Proxy 的 router,這部分代碼讀起來感覺怪怪的,其實就是對於 bc,首先建立到 bc.addr 的連接,然後先創建從 bc 讀取結果的 chan *Request(讀取之後會設置結果,以便發送給請求方),放入 goroutine 不斷讀取,最後循環從 input 中獲取請求,經過路由轉發之後寫入 bc。

說說路由轉發的邏輯,分為兩部分:

1. Proxy 接收請求和返回數據,這個實現類似上面說的 Proxy 和 Master 的交互;

2. Proxy 收到請求方的數據之後需要 decode,如果是類似 MGET 指令的話會分拆成多個 GET 請求,然後向 Master 發送請求;根據 crc32(key) % 1024 計算出 Slot id,然後走 Proxy 和 Master 的交互流程(ps,如果此 Slot 處在遷移狀態,那麼會先調用 SLOTSMGRTTAGONE 把 key 遷移至新 ServerGroup)。

3. 看看 Proxy。

codis 源碼理解

conf 是 Proxy 的配置數據;

topo 是 ZK 或 etcd 操作接口,裡面保存了 ProductName,一個 Proxy 實例只有一個 ProductName(就是一個服務);

groups 是 ServerGroup 信息,key 是 Slot id,value 是 ServerGroup id;

lastActionSeq 用於保存 action seq,其中 evtbus 保存 watch proxy 和 action 的 事件信息;

router 下面再講;

listener 是 Proxy 的 Listener;

kill、wait、stop 用來正確的處理退出邏輯。

proxy 的處理過程(不說 load 配置和如何處理退出的部分):

1). 先初始化 Router,主要是初始化 Router 中的 1024 個 Slot,只是把 Slot 的 id 標識出來 ;

2). 在 zk 中註冊 proxy 的臨時節點,節點路徑:/zk/codis/db_{productName}/proxy/{proxyId},內容是以 ProxyInfo 數據結構報錯的 proxy 信息;

3). 在 zk 中註冊 proxy fence 的永久節點,節點路徑:/zk/codis/db_{productName}/fence/{proxyAddr},內容為空。

為什麼有了 proxy 節點還需要 fence 節點呢,是為了來判斷 proxy 是否是正常退出的,比如使用 kill -9 殺 proxy 以後,proxy 節點會消失,fence 節點不會消失,對比下就知道是非正常退出。

proxy 在收到 kill 信號(os.Interrupt, syscall.SIGTERM, os.Kill)後會把 proxy fence 和 proxy 節點刪除,proxy 也就下線了,但是如果 kill -9 就不會刪除,需要手動刪除。

4). 此時還要等待 proxy 是在線狀態,這裡的邏輯是 proxy 剛啟動時候的狀態是 PROXY_STATE_OFFLINE(main.go 會調用 dashboard api 設置自己為 PROXY_STATE_ONLINE,為了保證 proxy 信息已經註冊到 zk,main.go 會等待一秒鐘再設置),一旦 proxy 是在線狀態之後,會開一個 goroutine rewatch proxy 狀態(zk 節點是 /zk/codis/db_{productName}/proxy),如果 proxy 有變化會通知到 evtbus 這個 channel 裡;

5). 開啟一個 goroutine 來 watch action 節點 /zk/codis/db_{productName}/actions 的 children,如果有變化也通知給 evtbus;

6). 下一步,fill Router 的 Slot,有兩部分,一部分是 fill Server 數據結構中的 groups,key 是 Slot id,value 是 Group id,另一部分是 fill Router 中的 Slot 中的 bc,Router 中的 pool 是連接池,Slot 從 pool 中取 bc;

7). 此時開始處理請求,使用 goroutine,拿到請求扔給 Router 部分來處理;

8). 開啟一個 loopEvents,如果檢查到有 kill 信號,刪除 zk 中的 proxy 節點,下線;而且從 evtbus 裡讀取事件,做處理;額外的,定時器,每隔一段事件 PING 一次 proxy 後端的 Codis Server,以保持探活。

4. proxy 之間如何協調。

第 3 步說了,proxy 會監聽 /zk/codis/db_{productName}/proxy 和 /zk/codis/db_{productName}/actions 的變化,codis 就是通過這兩個監聽機制保證 proxy 的信息一致。

/zk/codis/db_{productName}/proxy 主要是獲取 proxy 的狀態信息,如果狀態變成 PROXY_STATE_MARK_OFFLINE,則刪除 fence 節點和 proxy 節點,並在內存中標記狀態為 PROXY_STATE_MARK_OFFLINE,此處 loopEvents 會停止,然後觸發 serve() 的 s.close(),然後 handleConns 停止,serve() 停止,proxy 退出。

/zk/codis/db_{productName}/actions 則是監聽 slot、group 等的變化,比如:

ACTION_TYPE_SLOT_MIGRATE

ACTION_TYPE_SLOT_CHANGED

ACTION_TYPE_SLOT_PREMIGRATE

ACTION_TYPE_SERVER_GROUP_CHANGED

ACTION_TYPE_MULTI_SLOT_CHANGED

收到這些變化之後,從 zk 中拿新的信息,來 fill 內存中 Slot 中的信息,然後會創建 /zk/codis/db_{productName}/ActionResponse/{seq}/proxyId 來確認 proxy 已經響應此 action。

還有一點,新建 action 的時候有個開關:needConfirm,如果為真,則會確認 proxy node 和 fence node 一致,而且會等待所有 proxy 回覆了 action,如果有 proxy 沒回復,則設置此 proxy 為 PROXY_STATE_MARK_OFFLINE,並報錯。

5. 關於 Slot 遷移。

通過 dashboard api (/api/migrate) 來遷移,傳入的數據結構如下:

codis 源碼理解

然後把 From 到 To 的每個 slot 生成 MigrateTaskInfo。

codis 源碼理解

然後把 MigrateTaskInfo 推到 globalMigrateManager,dashboard 啟動的時候會初始化 globalMigrateManager,globalMigrateManager 數據結構如下:

codis 源碼理解

MigrateTask 結構如下:

codis 源碼理解

SlotMigrateProgress 結構如下:

codis 源碼理解

初始化 globalMigrateManager 會創建 /zk/codis/db_{productName}/migrate_tasks,然後進入執行遷移的 loop,不斷從 /zk/codis/db_{productName}/migrate_tasks 讀取任務並遷移。

globalMigrateManager 收到 MigrateTaskInfo 後會創建任務 /zk/codis/db_{productName}/migrate_tasks/{seq},內容就是 MigrateTaskInfo 信息,然後遷移 api 返回。

主要的處理在 loop 裡面:

1). 從 /zk/codis/db_{productName}/migrate_tasks/ 取出最早的 task,封裝成 MigrateTask;

2). 做遷移 check,檢查所有 slot,如果狀態是 SLOT_STATUS_MIGRATE 或者 SLOT_STATUS_PRE_MIGRATE 的數量大於1,報錯,如果等於1,判斷是否是此 MigrateTask 中的 slot,如果不是則報錯;

3). 修改 task 的狀態為 MIGRATE_TASK_MIGRATING (migrating);

4). 遷移 slot,在遷移之前要把 slot 的狀態改掉,如果原狀態不是 SLOT_STATUS_MIGRATE,改成 SLOT_STATUS_PRE_MIGRATE,之後強制把狀態改成 SLOT_STATUS_MIGRATE,而且修改 from group 和 to group。

5). 然後不斷地把源 group master 的數據向 目標 group master 拷貝,完成之後修改 slot 狀態為 SLOT_STATUS_ONLINE,from group 和 to group 為 INVALID_ID。

6). 遷移完之後會刪除 task,也就是刪除 zk /zk/codis/db_{productName}/migrate_tasks/{seq}。如果遷移失敗而且 slot 狀態為 SLOT_STATUS_PRE_MIGRATE(如果不是 SLOT_STATUS_PRE_MIGRATE,說明已經在遷移,需手動處理),會把 slot 狀態改為 SLOT_STATUS_ONLINE。

7). 額外重要的一點,每次更新 slot 狀態時,都會發起 slot 的 action,等待所有 proxy 回覆才繼續。而且 proxy 收到 slot 變化後,會更新 slot 狀態,如果 slot 在遷移狀態(根據 slot 的 migrate.bc 判斷)有訪問到達 proxy,會先把數據從 from group 拷到 to group,然後再從 to group 請求,這點銜接的挺好。


分享到:


相關文章: