高可用分佈式存儲 etcd 的實現原理

高可用分佈式存儲 etcd 的實現原理

這篇文章將會介紹 etcd 的實現原理,其中包括 Raft 協議、存儲兩大模塊,在最後我們也會簡單介紹 etcd 一些具體應用場景。

簡介

etcd 的官方將它定位成一個可信賴的分佈式鍵值存儲服務,它能夠為整個分佈式集群存儲一些關鍵數據,協助分佈式集群的正常運轉。


高可用分佈式存儲 etcd 的實現原理

我們可以簡單看一下 etcd 和 Zookeeper 在定義上有什麼不同:

  • etcd is a distributed reliable key-value store for the most critical data of a distributed system…
  • ZooKeeper is a centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing group services.

其中前者是一個用於存儲關鍵數據的鍵值存儲,後者是一個用於管理配置等信息的中心化服務。

etcd 的使用其實非常簡單,它對外提供了 gRPC 接口,我們可以通過 Protobuf 和 gRPC 直接對 etcd 中存儲的數據進行管理,也可以使用官方提供的 etcdctl 操作存儲的數據。


高可用分佈式存儲 etcd 的實現原理

文章並不會展開介紹 etcd 的使用方法,這一小節將逐步介紹幾大核心模塊的實現原理,包括 etcd 使用 Raft 協議同步各個節點數據的過程以及 etcd 底層存儲數據使用的結構。

Raft

在每一個分佈式系統中,etcd 往往都扮演了非常重要的地位,由於很多服務配置發現以及配置的信息都存儲在 etcd 中,所以整個集群可用性的上限往往就是 etcd 的可用性,而使用 3 ~ 5 個 etcd 節點構成高可用的集群往往都是常規操作。


高可用分佈式存儲 etcd 的實現原理

正是因為 etcd 在使用的過程中會啟動多個節點,如何處理幾個節點之間的分佈式一致性就是一個比較有挑戰的問題了。

解決多個節點數據一致性的方案其實就是共識算法,在之前的文章中我們簡單介紹過 Zookeeper 使用的 Zab 協議 以及常見的 共識算法 Paxos 和 Raft,etcd 使用的共識算法就是 Raft,這一節我們將詳細介紹 Raft 以及 etcd 中 Raft 的一些實現細節。

介紹

Raft 從一開始就被設計成一個易於理解和實現的共識算法,它在容錯和性能上與 Paxos 協議比較類似,區別在於它將分佈式一致性的問題分解成了幾個子問題,然後一一進行解決。

每一個 Raft 集群中都包含多個服務器,在任意時刻,每一臺服務器只可能處於 Leader、Follower 以及 Candidate 三種狀態;在處於正常的狀態時,集群中只會存在一個 Leader,其餘的服務器都是 Follower。


高可用分佈式存儲 etcd 的實現原理

所有的 Follower 節點都是被動的,它們不會主動發出任何的請求,只會響應 Leader 和 Candidate 發出的請求,對於每一個用戶的可變操作,都會被路由給 Leader 節點進行處理,除了 Leader 和 Follower 節點之外,Candidate 節點其實只是集群運行過程中的一個臨時狀態。

Raft 集群中的時間也被切分成了不同的幾個任期(Term),每一個任期都會由 Leader 的選舉開始,選舉結束後就會進入正常操作的階段,直到 Leader 節點出現問題才會開始新一輪的選擇。


高可用分佈式存儲 etcd 的實現原理

我們將 Raft 協議分成三個子問題:節點選舉、日誌複製以及安全性,文章會以 etcd 為例介紹 Raft 協議是如何解決這三個子問題的。

節點選舉

使用 Raft 協議的 etcd 集群在啟動節點時,會遵循 Raft 協議的規則,所有節點一開始都被初始化為 Follower 狀態,新加入的節點會在 NewNode 中做一些配置的初始化,包括用於接收各種信息的 Channel:


高可用分佈式存儲 etcd 的實現原理

在做完這些初始化的節點和 Raft 配置的事情之後,就會進入一個由 for 和 select 組成的超大型循環,這個循環會從 Channel 中獲取待處理的事件:


高可用分佈式存儲 etcd 的實現原理

作者對整個循環內的代碼進行了簡化,因為當前只需要關心三個 Channel 中的消息,也就是用於接受其他節點消息的 recvc、用於觸發定時任務的 tickc 以及用於暫停當前節點的 stop。


高可用分佈式存儲 etcd 的實現原理

除了 stop Channel 中介紹到的消息之外,recvc 和 tickc 兩個 Channel 中介紹到事件時都會交給當前節點持有 Raft 結構體處理。

定時器與心跳

當節點從任意狀態(包括啟動)調用 becomeFollower 時,都會將節點的定時器設置為 tickElection:


高可用分佈式存儲 etcd 的實現原理

如果當前節點可以成為 Leader 並且上一次收到 Leader 節點的消息或者心跳已經超過了等待的時間,當前節點就會發送 MsgHup 消息嘗試開始新的選舉。

但是如果 Leader 節點正常運行,就能夠同樣通過它的定時器 tickHeartbeat 向所有的 Follower 節點廣播心跳請求,也就是 MsgBeat 類型的 RPC 消息:


高可用分佈式存儲 etcd 的實現原理

上述代碼段 Leader 節點中調用的 Step 函數,最終會調用 stepLeader 方法,該方法會根據消息的類型進行不同的處理:


高可用分佈式存儲 etcd 的實現原理

bcastHeartbeat 方法最終會向所有的 Follower 節點發送 MsgHeartbeat 類型的消息,通知它們目前 Leader 的存活狀態,重置所有 Follower 持有的超時計時器。


高可用分佈式存儲 etcd 的實現原理

作為集群中的 Follower,它們會在 stepFollower 方法中處理接收到的全部消息,包括 Leader 節點發送的心跳 RPC 消息:


高可用分佈式存儲 etcd 的實現原理

當 Follower 接受到了來自 Leader 的 RPC 消息 MsgHeartbeat 時,會將當前節點的選舉超時時間重置並通過 handleHeartbeat 向 Leader 節點發出響應 —— 通知 Leader 當前節點能夠正常運行。

而 Candidate 節點對於 MsgHeartBeat 消息的處理會稍有不同,它會先執行 becomeFollower 設置當前節點和 Raft 協議的配置:


高可用分佈式存儲 etcd 的實現原理

Follower 與 Candidate 會根據節點類型的不同做出不同的響應,兩者收到心跳請求時都會重置節點的選舉超時時間,不過後者會將節點的狀態直接轉變成 Follower:


高可用分佈式存儲 etcd 的實現原理

當 Leader 節點收到心跳的響應時就會將對應節點的狀態設置為 Active,如果 Follower 節點在一段時間內沒有收到來自 Leader 節點的消息就會嘗試發起競選。


高可用分佈式存儲 etcd 的實現原理

到了這裡,心跳機制就起到了作用開始發送 MsgHup 嘗試重置整個集群中的 Leader 節點,接下來我們就會開始分析 Raft 協議中的競選流程了。

競選流程

如果集群中的某一個 Follower 節點長時間內沒有收到來自 Leader 的心跳請求,當前節點就會通過 MsgHup 消息進入預選舉或者選舉的流程。


高可用分佈式存儲 etcd 的實現原理

如果收到 MsgHup 消息的節點不是 Leader 狀態,就會根據當前集群的配置選擇進入 PreElection 或者 Election階段,PreElection 階段並不會真正增加當前節點的 Term,它的主要作用是得到當前集群能否成功選舉出一個 Leader 的答案,如果當前集群中只有兩個節點而且沒有預選舉階段,那麼這兩個節點的 Term 會無休止的增加,預選舉階段就是為了解決這一問題而出現的。


高可用分佈式存儲 etcd 的實現原理

在這裡不會討論預選舉的過程,而是將目光主要放在選舉階段,具體瞭解一下使用 Raft 協議的 etcd 集群是如何從眾多節點中選出 Leader 節點的。

我們可以繼續來分析 campaign 方法的具體實現,下面就是刪去預選舉相關邏輯後的代碼:

<code>// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/raft/raft.go#L730-766
func (r *raft) campaign(t CampaignType) {
\tr.becomeCandidate()
\t
\tif r.quorum() == r.poll(r.id, voteRespMsgType(voteMsg), true) {
\t\tr.becomeLeader()
\t\treturn
\t}
\tfor id := range r.prs {
\t\tif id == r.id {
\t\t\tcontinue
\t\t}

\t\tr.send(pb.Message{Term: r.Term, To: id, Type: pb.MsgVote, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx})
\t}
}/<code>

當前節點會立刻調用 becomeCandidate 將當前節點的 Raft 狀態變成候選人;在這之後,它會將票投給自己,如果當前集群只有一個節點,該節點就會直接成為集群中的 Leader 節點。

如果集群中存在了多個節點,就會向集群中的其他節點發出 MsgVote 消息,請求其他節點投票,在 Step 函數中包含不同狀態的節點接收到消息時的響應:


高可用分佈式存儲 etcd 的實現原理

如果當前節點投的票就是消息的來源或者當前節點沒有投票也沒有 Leader,那麼就會向來源的節點投票,否則就會通知該節點當前節點拒絕投票。


高可用分佈式存儲 etcd 的實現原理

在 stepCandidate 方法中,候選人節點會處理來自其他節點的投票響應消息,也就是 MsgVoteResp:

高可用分佈式存儲 etcd 的實現原理

每當收到一個 MsgVoteResp 類型的消息時,就會設置當前節點持有的 votes 數組,更新其中存儲的節點投票狀態並返回投『同意』票的人數,如果獲得的票數大於法定人數 quorum,當前節點就會成為集群的 Leader 並向其他的節點發送當前節點當選的消息,通知其餘節點更新 Raft 結構體中的 Term 等信息。

節點狀態

對於每一個節點來說,它們根據不同的節點狀態會對網絡層發來的消息做出不同的響應,我們會分別介紹下面的四種狀態在 Raft 中對於配置和消息究竟是如何處理的。


高可用分佈式存儲 etcd 的實現原理

對於每一個 Raft 的節點狀態來說,它們分別有三個比較重要的區別,其中一個是在改變狀態時調用 becomeLeader、becomeCandidate、becomeFollower 和 becomePreCandidate 方法改變 Raft 狀態有比較大的不同,第二是處理消息時調用 stepLeader、stepCandidate 和 stepFollower 時有比較大的不同,最後是幾種不同狀態的節點具有功能不同的定時任務。

對於方法的詳細處理,我們在這一節中不詳細介紹和分析,如果一個節點的狀態是 Follower,那麼當前節點切換到 Follower 一定會通過 becomeFollower 函數,在這個函數中會重置節點持有任期,並且設置處理消息的函數為 stepFollower:


高可用分佈式存儲 etcd 的實現原理

除此之外,它還會設置一個用於在 Leader 節點宕機時觸發選舉的定時器 tickElection。

Candidate 狀態的節點與 Follower 的配置差不了太多,只是在消息處理函數 step、任期以及狀態上的設置有一些比較小的區別:


高可用分佈式存儲 etcd 的實現原理

最後的 Leader 就與這兩者有其他的區別了,它不僅設置了處理消息的函數 step 而且設置了與其他狀態完全不同的 tick 函數:


高可用分佈式存儲 etcd 的實現原理

這裡的 tick 函數 tickHeartbeat 每隔一段時間會通過 Step 方法向集群中的其他節點發送 MsgBeat 消息:

高可用分佈式存儲 etcd 的實現原理

上述代碼中的 MsgBeat 消息會在 Step 中被轉換成 MsgHeartbeat 最終發送給其他的節點,Leader 節點超時之後的選舉流程我們在前兩節中也已經介紹過了,在這裡就不再重複了。

存儲

etcd 目前支持 V2 和 V3 兩個大版本,這兩個版本在實現上有比較大的不同,一方面是對外提供接口的方式,另一方面就是底層的存儲引擎,V2 版本的實例是一個純內存的實現,所有的數據都沒有存儲在磁盤上,而 V3 版本的實例就支持了數據的持久化。


高可用分佈式存儲 etcd 的實現原理

在這一節中,我們會介紹 V3 版本的 etcd 究竟是通過什麼樣的方式存儲用戶數據的。

後端

在 V3 版本的設計中,etcd 通過 backend 後端這一設計,很好地封裝了存儲引擎的實現細節,為上層提供一個更一致的接口,對於 etcd 的其他模塊來說,它們可以將更多注意力放在接口中的約定上,不過在這裡,我們更關注的是 etcd 對 Backend 接口的實現。


高可用分佈式存儲 etcd 的實現原理

etcd 底層默認使用的是開源的嵌入式鍵值存儲數據庫 bolt,但是這個項目目前的狀態已經是歸檔不再維護了,如果想要使用這個項目可以使用 CoreOS 的 bbolt 版本。

高可用分佈式存儲 etcd 的實現原理

這一小節中,我們會簡單介紹 etcd 是如何使用 BoltDB 作為底層存儲的,首先可以先來看一下 pacakge 內部的 backend 結構體,這是一個實現了 Backend 接口的結構:

高可用分佈式存儲 etcd 的實現原理

從結構體的成員 db 我們就可以看出,它使用了 BoltDB 作為底層存儲,另外的兩個 readTx 和 batchTx 分別實現了 ReadTx 和 BatchTx 接口:

高可用分佈式存儲 etcd 的實現原理

從這兩個接口的定義,我們不難發現它們能夠對外提供數據庫的讀寫操作,而 Backend 就能對這兩者提供的方法進行封裝,為上層屏蔽存儲的具體實現:

高可用分佈式存儲 etcd 的實現原理

每當我們使用 newBackend 創建一個新的 backend 結構時,都會創建一個 readTx 和 batchTx 結構體,這兩者一個負責處理只讀請求,一個負責處理讀寫請求:

高可用分佈式存儲 etcd 的實現原理

當我們在 newBackend 中進行了初始化 BoltDB、事務等工作後,就會開一個 goroutine 異步的對所有批量讀寫事務進行定時提交:

高可用分佈式存儲 etcd 的實現原理

對於上層來說,backend 其實只是對底層存儲的一個抽象,很多時候並不會直接跟它打交道,往往都是使用它持有的 ReadTx 和 BatchTx 與數據庫進行交互。

只讀事務

目前大多數的數據庫對於只讀類型的事務並沒有那麼多的限制,尤其是在使用了 MVCC 之後,所有的只讀請求幾乎不會被寫請求鎖住,這大大提升了讀的效率,由於在 BoltDB 的同一個 goroutine 中開啟兩個相互依賴的只讀事務和讀寫事務會發生死鎖,為了避免這種情況我們還是引入了 sync.RWLock 保證死鎖不會出現:


高可用分佈式存儲 etcd 的實現原理

你可以看到在整個結構體中,除了用於保護 tx 的 txmu 讀寫鎖之外,還存在另外一個 mu 讀寫鎖,它的作用是保證 buf 中的數據不會出現問題,buf 和結構體中的 buckets 都是用於加速讀效率的緩存。

高可用分佈式存儲 etcd 的實現原理

對於一個只讀事務來說,它對上層提供了兩個獲取存儲引擎中數據的接口,分別是 UnsafeRange 和 UnsafeForEach,在這裡會重點介紹前面方法的實現細節:

高可用分佈式存儲 etcd 的實現原理

上述代碼中省略了加鎖保護讀緩存以及 Bucket 中存儲數據的合法性,也省去了一些參數的檢查,不過方法的整體接口還是沒有太多變化,UnsafeRange 會先從自己持有的緩存 txReadBuffer 中讀取數據,如果數據不能夠滿足調用者的需求,就會從 buckets 緩存中查找對應的 BoltDB bucket 並從 BoltDB 數據庫中讀取。

<code>// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/backend/batch_tx.go#L121-141
func unsafeRange(c *bolt.Cursor, key, endKey []byte, limit int64) (keys [][]byte, vs [][]byte) {
\tvar isMatch func(b []byte) bool
\tif len(endKey) > 0 {
\t\tisMatch = func(b []byte) bool { return bytes.Compare(b, endKey) < 0 }
\t} else {
\t\tisMatch = func(b []byte) bool { return bytes.Equal(b, key) }
\t\tlimit = 1
\t}

\tfor ck, cv := c.Seek(key); ck != nil && isMatch(ck); ck, cv = c.Next() {
\t\tvs = append(vs, cv)
\t\tkeys = append(keys, ck)
\t\tif limit == int64(len(keys)) {
\t\t\tbreak
\t\t}
\t}
\treturn keys, vs
}/<code>

這個包內部的函數 unsafeRange 實際上通過 BoltDB 中的遊標來遍歷滿足查詢條件的鍵值對。

到這裡為止,整個只讀事務提供的接口就基本介紹完了,在 etcd 中無論我們想要後去單個 Key 還是一個範圍內的 Key 最終都是通過 Range 來實現的,這其實也是隻讀事務的最主要功能。

讀寫事務

只讀事務只提供了讀數據的能力,包括 UnsafeRange 和 UnsafeForeach,而讀寫事務 BatchTx 提供的就是讀和寫數據的能力了:


高可用分佈式存儲 etcd 的實現原理

讀寫事務同時提供了不帶緩存的 batchTx 實現以及帶緩存的 batchTxBuffered 實現,後者其實『繼承了』前者的結構體,並額外加入了緩存 txWriteBuffer 加速讀請求:

<code>// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/backend/batch_tx.go#L243-246
type batchTxBuffered struct {
\tbatchTx
\tbuf txWriteBuffer
}/<code>

後者在實現接口規定的方法時,會直接調用 batchTx 的同名方法,並將操作造成的副作用的寫入的緩存中,在這裡我們並不會展開介紹這一版本的實現,還是以分析 batchTx 的方法為主。

當我們向 etcd 中寫入數據時,最終都會調用 batchTx 的 unsafePut 方法將數據寫入到 BoltDB 中:

<code>// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/backend/batch_tx.go#L65-67
func (t *batchTx) UnsafePut(bucketName []byte, key []byte, value []byte) {
\tt.unsafePut(bucketName, key, value, false)
}

// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/backend/batch_tx.go#L74-103
func (t *batchTx) unsafePut(bucketName []byte, key []byte, value []byte, seq bool) {
\tbucket := t.tx.Bucket(bucketName)
\tif err := bucket.Put(key, value); err != nil {
\t\tplog.Fatalf("cannot put key into bucket (%v)", err)
\t}
\tt.pending++
}/<code>

這兩個方法的實現非常清晰,作者覺得他們都並不值得展開詳細介紹,只是調用了 BoltDB 提供的 API 操作一下 bucket 中的數據,而另一個刪除方法的實現與這個也差不多:

<code>// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/backend/batch_tx.go#L144-169
func (t *batchTx) UnsafeDelete(bucketName []byte, key []byte) {
\tbucket := t.tx.Bucket(bucketName)
\terr := bucket.Delete(key)
\tif err != nil {
\t\tplog.Fatalf("cannot delete key from bucket (%v)", err)
\t}
\tt.pending++
}/<code>

它們都是通過 Bolt.Tx 找到對應的 Bucket,然後做出相應的增刪操作,但是這寫請求在這兩個方法執行後其實並沒有提交,我們還需要手動或者等待 etcd 自動將請求提交:

<code>// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/backend/batch_tx.go#L184-188
func (t *batchTx) Commit() {
\tt.Lock()
\tt.commit(false)
\tt.Unlock()
}

// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/backend/batch_tx.go#L210-241
func (t *batchTx) commit(stop bool) {
\tif t.tx != nil {
\t\tif t.pending == 0 && !stop {
\t\t\treturn
\t\t}

\t\tstart := time.Now()

\t\terr := t.tx.Commit()

\t\trebalanceSec.Observe(t.tx.Stats().RebalanceTime.Seconds())
\t\tspillSec.Observe(t.tx.Stats().SpillTime.Seconds())
\t\twriteSec.Observe(t.tx.Stats().WriteTime.Seconds())
\t\tcommitSec.Observe(time.Since(start).Seconds())
\t\tatomic.AddInt64(&t.backend.commits, 1)

\t\tt.pending = 0
\t}
\tif !stop {
\t\tt.tx = t.backend.begin(true)
\t}
}/<code>

在每次調用 Commit 對讀寫事務進行提交時,都會先檢查是否有等待中的事務,然後會將數據上報至 Prometheus 中,其他的服務就可以將 Prometheus 作為數據源對 etcd 的執行狀況進行監控了。

索引

經常使用 etcd 的開發者可能會了解到,它本身對於每一個鍵值對都有一個 revision 的概念,鍵值對的每一次變化都會被 BoltDB 單獨記錄下來,所以想要在存儲引擎中獲取某一個 Key 對應的值,要先獲取 revision,再通過它才能找到對應的值,在裡我們想要介紹的其實是 etcd 如何管理和存儲一個 Key 的多個 revision 記錄。

高可用分佈式存儲 etcd 的實現原理

在 etcd 服務中有一個用於存儲所有的鍵值對 revision 信息的 btree,我們可以通過 index 的 Get 接口獲取一個 Key 對應 Revision 的值:

<code>// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/index.go#L68-76
func (ti *treeIndex) Get(key []byte, atRev int64) (modified, created revision, ver int64, err error) {
\tkeyi := &keyIndex{key: key}
\tif keyi = ti.keyIndex(keyi); keyi == nil {
\t\treturn revision{}, revision{}, 0, ErrRevisionNotFound
\t}
\treturn keyi.get(ti.lg, atRev)
}/<code>

上述方法通過 keyIndex 方法查找 Key 對應的 keyIndex 結構體,這裡使用的內存結構體 btree 是 Google 實現的一個版本:

<code>// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/index.go#L84-89
func (ti *treeIndex) keyIndex(keyi *keyIndex) *keyIndex {
\tif item := ti.tree.Get(keyi); item != nil {
\t\treturn item.(*keyIndex)
\t}
\treturn nil
}/<code>

可以看到這裡的實現非常簡單,只是從 treeIndex 持有的成員 btree 中查找 keyIndex,將結果強制轉換成 keyIndex 類型後返回;獲取 Key 對應 revision 的方式也非常簡單:

<code>// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/key_index.go#L149-171
func (ki *keyIndex) get(lg *zap.Logger, atRev int64) (modified, created revision, ver int64, err error) {
\tg := ki.findGeneration(atRev)
\tif g.isEmpty() {
\t\treturn revision{}, revision{}, 0, ErrRevisionNotFound
\t}

\tn := g.walk(func(rev revision) bool { return rev.main > atRev })
\tif n != -1 {
\t\treturn g.revs[n], g.created, g.ver - int64(len(g.revs)-n-1), nil
\t}

\treturn revision{}, revision{}, 0, ErrRevisionNotFound

}/<code>

KeyIndex

在我們具體介紹方法實現的細節之前,首先我們需要理解 keyIndex 包含的字段以及管理同一個 Key 不同版本的方式:


高可用分佈式存儲 etcd 的實現原理

每一個 keyIndex 結構體中都包含當前鍵的值以及最後一次修改對應的 revision 信息,其中還保存了一個 Key 的多個 generation,每一個 generation 都會記錄當前 Key『從生到死』的全部過程,每當一個 Key 被刪除時都會調用 timestone 方法向當前的 generation 中追加一個新的墓碑版本:

<code>// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/key_index.go#L127-145
func (ki *keyIndex) tombstone(lg *zap.Logger, main int64, sub int64) error {
\tif ki.generations[len(ki.generations)-1].isEmpty() {
\t\treturn ErrRevisionNotFound
\t}
\tki.put(lg, main, sub)
\tki.generations = append(ki.generations, generation{})
\treturn nil
}/<code>

這個 tombstone 版本標識這當前的 Key 已經被刪除了,但是在每次刪除一個 Key 之後,就會在當前的 keyIndex 中創建一個新的 generation 結構用於存儲新的版本信息,其中 ver 記錄當前 generation 包含的修改次數,created 記錄創建 generation 時的 revision 版本,最後的 revs 用於存儲所有的版本信息。

讀操作

etcd 中所有的查詢請求,無論是查詢一個還是多個、是數量還是鍵值對,最終都會調用 rangeKeys 方法:

<code>// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/kvstore_txn.go#L112-165
func (tr *storeTxnRead) rangeKeys(key, end []byte, curRev int64, ro RangeOptions) (*RangeResult, error) {
\trev := ro.Rev

\trevpairs := tr.s.kvindex.Revisions(key, end, rev)
\tif len(revpairs) == 0 {
\t\treturn &RangeResult{KVs: nil, Count: 0, Rev: curRev}, nil
\t}

\tkvs := make([]mvccpb.KeyValue, int(ro.Limit))
\trevBytes := newRevBytes()
\tfor i, revpair := range revpairs[:len(kvs)] {
\t\trevToBytes(revpair, revBytes)
\t\t_, vs := tr.tx.UnsafeRange(keyBucketName, revBytes, nil, 0)
\t\tkvs[i].Unmarshal(vs[0])
\t}
\treturn &RangeResult{KVs: kvs, Count: len(revpairs), Rev: curRev}, nil
}/<code>

為了獲取一個範圍內的所有鍵值對,我們首先需要通過 Revisions 函數從 btree 中獲取範圍內所有的 keyIndex:

<code>// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/index.go#L106-120
func (ti *treeIndex) Revisions(key, end []byte, atRev int64) (revs []revision) {
\tif end == nil {
\t\trev, _, _, err := ti.Get(key, atRev)
\t\tif err != nil {
\t\t\treturn nil
\t\t}
\t\treturn []revision{rev}
\t}
\tti.visit(key, end, func(ki *keyIndex) {
\t\tif rev, _, _, err := ki.get(ti.lg, atRev); err == nil {
\t\t\trevs = append(revs, rev)
\t\t}
\t})
\treturn revs
}/<code>

如果只需要獲取一個 Key 對應的版本,就是直接使用 treeIndex 的方法,但是當上述方法會從 btree 索引中獲取一個連續多個 revision 值時,就會調用 keyIndex.get 來遍歷整顆樹並選取合適的版本:

<code>func (ki *keyIndex) get(lg *zap.Logger, atRev int64) (modified, created revision, ver int64, err error) {
\tg := ki.findGeneration(atRev)
\tif g.isEmpty() {
\t\treturn revision{}, revision{}, 0, ErrRevisionNotFound
\t}

\tn := g.walk(func(rev revision) bool { return rev.main > atRev })
\tif n != -1 {
\t\treturn g.revs[n], g.created, g.ver - int64(len(g.revs)-n-1), nil
\t}

\treturn revision{}, revision{}, 0, ErrRevisionNotFound
}/<code>

因為每一個 Key 的 keyIndex 中其實都存儲著多個 generation,我們需要根據傳入的參數返回合適的 generation並從其中返回主版本大於 atRev 的 revision 結構。

對於上層的鍵值存儲來說,它會利用這裡返回的 revision 從真正存儲數據的 BoltDB 中查詢當前 Key 對應 revision 的結果。

寫操作

當我們向 etcd 中插入數據時,會使用傳入的 key 構建一個 keyIndex 結構體並從樹中獲取相關版本等信息:

<code>// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/index.go#L53-66
func (ti *treeIndex) Put(key []byte, rev revision) {
\tkeyi := &keyIndex{key: key}

\titem := ti.tree.Get(keyi)
\tif item == nil {
\t\tkeyi.put(ti.lg, rev.main, rev.sub)
\t\tti.tree.ReplaceOrInsert(keyi)
\t\treturn
\t}
\tokeyi := item.(*keyIndex)
\tokeyi.put(ti.lg, rev.main, rev.sub)
}/<code>

treeIndex.Put 在獲取內存中的 keyIndex 結構之後會通過 keyIndex.put 其中加入新的 revision:

<code>// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/key_index.go#L77-104
func (ki *keyIndex) put(lg *zap.Logger, main int64, sub int64) {
\trev := revision{main: main, sub: sub}

\tif len(ki.generations) == 0 {
\t\tki.generations = append(ki.generations, generation{})
\t}
\tg := &ki.generations[len(ki.generations)-1]
\tif len(g.revs) == 0 {
\t\tg.created = rev
\t}
\tg.revs = append(g.revs, rev)
\tg.ver++
\tki.modified = rev
}/<code>

每一個新 revision 結構體寫入 keyIndex 時,都會改變當前 generation 的 created 和 ver 等參數,從這個方法中我們就可以瞭解到 generation 中的各個成員都是如何被寫入的。

寫入的操作除了增加之外,刪除某一個 Key 的函數也會經常被調用:

<code>// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/kvstore_txn.go#L252-309
func (tw *storeTxnWrite) delete(key []byte) {
\tibytes := newRevBytes()
\tidxRev := revision{main: tw.beginRev + 1, sub: int64(len(tw.changes))}
\trevToBytes(idxRev, ibytes)

\tibytes = appendMarkTombstone(tw.storeTxnRead.s.lg, ibytes)

\tkv := mvccpb.KeyValue{Key: key}

\td, _ := kv.Marshal()

\ttw.tx.UnsafeSeqPut(keyBucketName, ibytes, d)
\ttw.s.kvindex.Tombstone(key, idxRev)
\ttw.changes = append(tw.changes, kv)
}/<code>

正如我們在文章前面所介紹的,刪除操作會向結構體中的 generation 追加一個新的 tombstone 標記,用於標識當前的 Key 已經被刪除;除此之外,上述方法還會將每一個更新操作的 revision 存到單獨的 keyBucketName 中。

索引的恢復

因為在 etcd 中,所有的 keyIndex 都是在內存的 btree 中存儲的,所以在啟動服務時需要從 BoltDB 中將所有的數據都加載到內存中,在這裡就會初始化一個新的 btree 索引,然後調用 restore 方法開始恢復索引:

<code>// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/kvstore.go#L321-433
func (s *store) restore() error {
\tmin, max := newRevBytes(), newRevBytes()
\trevToBytes(revision{main: 1}, min)
\trevToBytes(revision{main: math.MaxInt64, sub: math.MaxInt64}, max)

\ttx := s.b.BatchTx()


\trkvc, revc := restoreIntoIndex(s.lg, s.kvindex)
\tfor {
\t\tkeys, vals := tx.UnsafeRange(keyBucketName, min, max, int64(restoreChunkKeys))
\t\tif len(keys) == 0 {
\t\t\tbreak
\t\t}
\t\trestoreChunk(s.lg, rkvc, keys, vals, keyToLease)
\t\tnewMin := bytesToRev(keys[len(keys)-1][:revBytesLen])
\t\tnewMin.sub++
\t\trevToBytes(newMin, min)
\t}
\tclose(rkvc)
\ts.currentRev =
\treturn nil
}/<code>

在恢復索引的過程中,有一個用於遍歷不同鍵值的『生產者』循環,其中由 UnsafeRange 和 restoreChunk 兩個方法構成,這兩個方法會從 BoltDB 中遍歷數據,然後將鍵值對傳到 rkvc 中,交給 restoreIntoIndex 方法中創建的 goroutine 處理:

<code>// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/kvstore.go#L486-506
func restoreChunk(lg *zap.Logger, kvc chan\tfor i, key := range keys {
\t\trkv := r evKeyValue{key: key}
\t\t_ := rkv.kv.Unmarshal(vals[i])
\t\trkv.kstr = string(rkv.kv.Key)
\t\tif isTombstone(key) {
\t\t\tdelete(keyToLease, rkv.kstr)
\t\t} else if lid := lease.LeaseID(rkv.kv.Lease); lid != lease.NoLease {
\t\t\tkeyToLease[rkv.kstr] = lid
\t\t} else {
\t\t\tdelete(keyToLease, rkv.kstr)
\t\t}
\t\tkvc \t}
}/<code>

先被調用的 restoreIntoIndex 方法會創建一個用於接受鍵值對的 Channel,在這之後會在一個 goroutine 中處理從 Channel 接收到的數據,並將這些數據恢復到內存裡的 btree 中:

<code>// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/kvstore.go#L441-484
func restoreIntoIndex(lg *zap.Logger, idx index) (chan\trkvc, revc := make(chan revKeyValue, restoreChunkKeys), make(chan int64, 1)

\tgo func() {
\t\tcurrentRev := int64(1)
\t\tdefer func() { revc \t\tfor rkv := range rkvc {
\t\t\tki = &keyIndex{key: rkv.kv.Key}
\t\t\tki := idx.KeyIndex(ki)

\t\t\trev := bytesToRev(rkv.key)
\t\t\tcurrentRev = rev.main
\t\t\tif ok {
\t\t\t\tif isTombstone(rkv.key) {
\t\t\t\t\tki.tombstone(lg, rev.main, rev.sub)
\t\t\t\t\tcontinue
\t\t\t\t}
\t\t\t\tki.put(lg, rev.main, rev.sub)
\t\t\t} else if !isTombstone(rkv.key) {
\t\t\t\tki.restore(lg, revision{rkv.kv.CreateRevision, 0}, rev, rkv.kv.Version)
\t\t\t\tidx.Insert(ki)
\t\t\t}
\t\t}
\t}()
\treturn rkvc, revc
}/<code>

恢復內存索引的相關代碼在實現上非常值得學習,兩個不同的函數通過 Channel 進行通信並使用 goroutine 處理任務,能夠很好地將消息的『生產者』和『消費者』進行分離。


高可用分佈式存儲 etcd 的實現原理

Channel 作為整個恢復索引邏輯的一個消息中心,它將遍歷 BoltDB 中的數據和恢復索引兩部分代碼進行了分離。

存儲

etcd 的 mvcc 模塊對外直接提供了兩種不同的訪問方式,一種是鍵值存儲 kvstore,另一種是 watchableStore 它們都實現了包內公開的 KV 接口:

<code>

kvstore

對於 kvstore 來說,其實沒有太多值得展開介紹的地方,它利用底層的 BoltDB 等基礎設施為上層提供最常見的增傷改查,它組合了下層的 readTx、batchTx 等結構體,將一些線程不安全的操作變成線程安全的。

<code>// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/kvstore_txn.go#L32-40
func (s *store) Read() TxnRead {
\ts.mu.RLock()
\ttx := s.b.ReadTx()
\ts.revMu.RLock()
\ttx.Lock()
\tfirstRev, rev := s.compactMainRev, s.currentRev
\ts.revMu.RUnlock()
\treturn newMetricsTxnRead(&storeTxnRead{s, tx, firstRev, rev})
}/<code>

它也負責對內存中 btree 索引的維護以及壓縮一些無用或者不常用的數據,幾個對外的接口 Read、Write 就是對 readTx、batchTx 等結構體的組合並將它們的接口暴露給其他的模塊。

watchableStore

另外一個比較有意思的存儲就是 watchableStore 了,它是 mvcc 模塊為外界提供 Watch 功能的接口,它負責了註冊、管理以及觸發 Watcher 的功能,我們先來看一下這個結構體的各個字段:

<code>// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/watchable_store.go#L45-65
type watchableStore struct {
\t*store

\tmu sync.RWMutex

\tunsynced watcherGroup
\tsynced watcherGroup

\tstopc chan struct{}
\twg sync.WaitGroup
}/<code>

每一個 watchableStore 其實都組合了來自 store 結構體的字段和方法,除此之外,還有兩個 watcherGroup 類型的字段,其中 unsynced 用於存儲未同步完成的實例,synced 用於存儲已經同步完成的實例。

在初始化一個新的 watchableStore 時,我們會創建一個用於同步watcherGroup 的 Goroutine,在 syncWatchersLoop 這個循環中會每隔 100ms 調用一次 syncWatchers 方法,將所有未通知的事件通知給所有的監聽者,這可以說是整個模塊的核心:

<code>func (s *watchableStore) syncWatchers() int {
\tcurRev := s.store.currentRev
\tcompactionRev := s.store.compactMainRev

\twg, minRev := s.unsynced.choose(maxWatchersPerSync, curRev, compactionRev)
\tminBytes, maxBytes := newRevBytes(), newRevBytes()
\trevToBytes(revision{main: minRev}, minBytes)
\trevToBytes(revision{main: curRev + 1}, maxBytes)

\ttx := s.store.b.ReadTx()
\trevs, vs := tx.UnsafeRange(keyBucketName, minBytes, maxBytes, 0)
\tevs := kvsToEvents(nil, wg, revs, vs)

\twb := newWatcherBatch(wg, evs)
\tfor w := range wg.watchers {
\t\tw.minRev = curRev + 1

\t\teb, ok := wb[w]
\t\tif !ok {
\t\t\ts.synced.add(w)
\t\t\ts.unsynced.delete(w)
\t\t\tcontinue
\t\t}

\t\tw.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: curRev})

\t\ts.synced.add(w)
\t\ts.unsynced.delete(w)
\t}

\treturn s.unsynced.size()
}/<code>

簡化後的 syncWatchers 方法中總共做了三件事情,首先是根據當前的版本從未同步的 watcherGroup 中選出一些待處理的任務,然後從 BoltDB 中後去當前版本範圍內的數據變更並將它們轉換成事件,事件和 watcherGroup 在打包之後會通過 send 方法發送到每一個 watcher 對應的 Channel 中。


高可用分佈式存儲 etcd 的實現原理

上述圖片中展示了 mvcc 模塊對於向外界提供的監聽某個 Key 和範圍的接口,外部的其他模塊會通過 watchStream.watch 函數與模塊內部進行交互,每一次調用 watch 方法最終都會向 watchableStore 持有的 watcherGroup 中添加新的 watcher 結構。

<code>// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/watcher.go#L108-135
func (ws *watchStream) Watch(id WatchID, key, end []byte, startRev int64, fcs ...FilterFunc) (WatchID, error) {
\tif id == AutoWatchID {
\t\tfor ws.watchers[ws.nextID] != nil {
\t\t\tws.nextID++
\t\t}
\t\tid = ws.nextID
\t\tws.nextID++
\t}

\tw, c := ws.watchable.watch(key, end, startRev, id, ws.ch, fcs...)

\tws.cancels[id] = c
\tws.watchers[id] = w
\treturn id, nil
}

// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/watchable_store.go#L111-142
func (s *watchableStore) watch(key, end []byte, startRev int64, id WatchID, ch chan\twa := &watcher{
\t\tkey: key,
\t\tend: end,
\t\tminRev: startRev,
\t\tid: id,
\t\tch: ch,
\t\tfcs: fcs,
\t}

\tsynced := startRev > s.store.currentRev || startRev == 0
\tif synced {
\t\ts.synced.add(wa)
\t} else {
\t\ts.unsynced.add(wa)
\t}

\treturn wa, func() { s.cancelWatcher(wa) }
}/<code>

當 etcd 服務啟動時,會在服務端運行一個用於處理監聽事件的 watchServer gRPC 服務,客戶端的 Watch 請求最終都會被轉發到這個服務的 Watch 函數中:

<code>// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/etcdserver/api/v3rpc/watch.go#L136-206
func (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) {
\tsws := serverWatchStream{
\t\t// ...
\t\tgRPCStream: stream,
\t\twatchStream: ws.watchable.NewWatchStream(),
\t\tctrlStream: make(chan *pb.WatchResponse, ctrlStreamBufLen),
\t}

\tsws.wg.Add(1)
\tgo func() {
\t\tsws.sendLoop()
\t\tsws.wg.Done()
\t}()

\tgo func() {
\t\tsws.recvLoop()
\t}()

\tsws.wg.Wait()
\treturn err
}/<code>

當客戶端想要通過 Watch 結果監聽某一個 Key 或者一個範圍的變動,在每一次客戶端調用服務端上述方式都會創建兩個 Goroutine,這兩個協程一個會負責向監聽者發送數據變動的事件,另一個協程會負責處理客戶端發來的事件。

<code>// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/etcdserver/api/v3rpc/watch.go#L220-334 
func (sws *serverWatchStream) recvLoop() error {
\tfor {
\t\treq, err := sws.gRPCStream.Recv()
\t\tif err == io.EOF {
\t\t\treturn nil
\t\t}
\t\tif err != nil {
\t\t\treturn err
\t\t}

\t\tswitch uv := req.RequestUnion.(type) {
\t\tcase *pb.WatchRequest_CreateRequest:
\t\t\tcreq := uv.CreateRequest

\t\t\tfilters := FiltersFromRequest(creq)
\t\t\twsrev := sws.watchStream.Rev()
\t\t\trev := creq.StartRevision
\t\t\tid, _ := sws.watchStream.Watch(mvcc.WatchID(creq.WatchId), creq.Key, creq.RangeEnd, rev, filters...)
\t\t\twr := &pb.WatchResponse{
\t\t\t\tHeader: sws.newResponseHeader(wsrev),
\t\t\t\tWatchId: int64(id),
\t\t\t\tCreated: true,
\t\t\t\tCanceled: err != nil,
\t\t\t}
\t\t\tselect {
\t\t\tcase sws.ctrlStream \t\t\tcase \t\t\t\treturn nil
\t\t\t}

\t\tcase *pb.WatchRequest_CancelRequest: // ...
\t\tcase *pb.WatchRequest_ProgressRequest: // ...
\t\tdefault:
\t\t\tcontinue
\t\t}
\t}
}/<code>

在用於處理客戶端的 recvLoop 方法中調用了 mvcc 模塊暴露出的 watchStream.Watch 方法,該方法會返回一個可以用於取消監聽事件的 watchID;當 gRPC 流已經結束後者出現錯誤時,當前的循環就會返回,兩個 Goroutine 也都會結束。

如果出現了更新或者刪除事件,就會被髮送到 watchStream 持有的 Channel 中,而 sendLoop 會通過 select 來監聽多個 Channel 中的數據並將接收到的數據封裝成 pb.WatchResponse 結構並通過 gRPC 流發送給客戶端:

<code>func (sws *serverWatchStream) sendLoop() {
\tfor {
\t\tselect {
\t\tcase wresp, ok := \t\t\tevs := wresp.Events
\t\t\tevents := make([]*mvccpb.Event, len(evs))
\t\t\tfor i := range evs {
\t\t\t\tevents[i] = &evs[i]\t\t\t}

\t\t\tcanceled := wresp.CompactRevision != 0
\t\t\twr := &pb.WatchResponse{
\t\t\t\tHeader: sws.newResponseHeader(wresp.Revision),

\t\t\t\tWatchId: int64(wresp.WatchID),
\t\t\t\tEvents: events,
\t\t\t\tCompactRevision: wresp.CompactRevision,
\t\t\t\tCanceled: canceled,
\t\t\t}

\t\t\tsws.gRPCStream.Send(wr)

\t\tcase c, ok := \t\tcase \t\tcase \t\t\treturn
\t\t}
\t}
}/<code>

對於每一個 Watch 請求來說,watchServer 會根據請求創建兩個用於處理當前請求的 Goroutine,這兩個協程會與更底層的 mvcc 模塊協作提供監聽和回調功能:


高可用分佈式存儲 etcd 的實現原理

到這裡,我們對於 Watch 功能的介紹就差不多結束了,從對外提供的接口到底層的使用的數據結構以及具體實現,其他與 Watch 功能相關的話題可以直接閱讀 etcd 的源代碼瞭解更加細節的實現。

應用

在上面已經介紹了核心的 Raft 共識算法以及使用的底層存儲之後,這一節更想談一談 etcd 的一些應用場景,與之前談到的 分佈式協調服務 Zookeeper 一樣,etcd 在大多數的集群中還是處於比較關鍵的位置,工程師往往都會使用 etcd 存儲集群中的重要數據和元數據,多個節點之間的強一致性以及集群部署的方式賦予了 etcd 集群高可用性。

我們依然可以使用 etcd 實現微服務架構中的服務發現、發佈訂閱、分佈式鎖以及分佈式協調等功能,因為雖然它被定義成了一個可靠的分佈式鍵值存儲,但是它起到的依然是一個分佈式協調服務的作用,這也使我們在需要不同的協調服務中進行權衡和選擇。

為什麼要在分佈式協調服務中選擇 etcd 其實是一個比較關鍵的問題,很多工程師選擇 etcd 主要是因為它使用 Go 語言開發、部署簡單、社區也比較活躍,但是缺點就在於它相比 Zookeeper 還是一個比較年輕的項目,需要一些時間來成長和穩定。

總結

etcd 的實現原理非常有趣,我們能夠在它的源代碼中學習很多 Go 編程的最佳實踐和設計,這也值得我們去研究它的源代碼。

目前很多項目和公司都在生產環境中大規模使用 etcd,這對於社區來說是意見非常有利的事情,如果微服務的大部分技術棧是 Go,作者也更加推薦各位讀者在選擇分佈式協調服務時選擇 etcd 作為系統的基礎設施。

原文轉載於:https://draveness.me/etcd-introduction


分享到:


相關文章: