用 Golang 實現基於 Redis 的安全高效 RPC 通信

前言

RPC(Remote Procedure Call),翻譯過來為“遠程過程調用”,是一種分佈式系統中服務或節點之間的有效通信機制。通過 RPC,某個節點(或客戶端)可以很輕鬆的調用遠端(或服務端)的方法或服務,就像在本地調用一樣簡單。現有的很多 RPC 框架都要求暴露服務端地址,也就是需要知道服務器的 IP 和 RPC 端口。而本篇文章將介紹一種不需要暴露 IP 地址和端口的 RPC 通信方式。這種方式是基於 Redis BRPOP/BLPOP 操作實現的延遲隊列,以及 Golang 中的 goroutine 協程異步機制,整個框架非常簡單和易於理解,同時也很高效、穩定和安全。這種方式已經應用到了

Crawlab 中的節點通信當中,成為了各節點即時傳輸信息的主要方式。下面我們將從 Crawlab 早期節點通信方案 PubSub 開始,介紹當時遇到的問題和解決方案,然後如何過渡到現在的 RPC 解決方案,以及它是如何在 Crawlab 中發揮作用的。

基於 PubSub 的方案

PubSub

早期的 Crawlab 是基於 Redis 的 PubSub,也就是發佈訂閱模式。這是 Redis 中主要用於一對多的單向通信的方案。其用法非常簡單:

<code>SUBSCRIBE channel1 channel2 ...
PUBLISH channelx message
/<code>

Redis的 PubSub 可以用作廣播模式,即一個發佈者對應多個訂閱者。而在Crawlab中,我們只有一個訂閱者對應一個發佈者的情況(主節點->工作節點: nodes:<node> )或一個訂閱者對應多個發佈者的情況(工作節點->主節點: nodes:master> )。這是為了方便雙向通信。/<node>

以下為節點通信原理示意圖。

用 Golang 實現基於 Redis 的安全高效 RPC 通信

各個節點會通過Redis的 PubSub 功能來做相互通信。

所謂 PubSub ,簡單來說是一個發佈訂閱模式。訂閱者(Subscriber)會在Redis上訂閱(Subscribe)一個通道,其他任何一個節點都可以作為發佈者(Publisher)在該通道上發佈(Publish)消息。

通信架構

在 Crawlab 中,主節點會訂閱 nodes:master 通道,其他節點如果需要向主節點發送消息,只需要向 nodes:master 發佈消息就可以了。同理,各工作節點會各自訂閱一個屬於自己的通道 nodes:<node> ( node_id 是MongoDB裡的節點ID,是MongoDB ObjectId),如果需要給工作節點發送消息,只需要發佈消息到該通道就可以了。/<node>

一個網絡請求的簡單過程如下:

  1. 客戶端(前端應用)發送請求給主節點(API);
  2. 主節點通過Redis PubSub 的 <nodes> 通道發佈消息給相應的工作節點;/<nodes>
  3. 工作節點收到消息之後,執行一些操作,並將相應的消息通過 <master> 通道發佈給主節點;/<master>
  4. 主節點收到消息之後,將消息返回給客戶端。

不是所有節點通信都是雙向的,也就是說,主節點只會單方面對工作節點通信,工作節點並不會返回響應給主節點,所謂的單向通信。以下是Crawlab的通信類型。

chan 和 goroutine

如果您在閱讀 Crawlab 源碼,會發現節點通信中有大量的 chan 語法,這是 Golang 的一個併發特性。

chan 表示為一個通道,在 Golang 中分為無緩衝和有緩衝的通道,我們用了無緩衝通道來阻塞協程,只有當 chan 接收到信號( chan

go 命令會起一個 goroutine (協程)來完成併發,配合 chan ,該協程可以利用無緩衝通道掛起,等待信號執行接下來的操作。

基於延遲隊列的方案

PubSub 方案的問題

PubSub 這種消息訂閱-發佈設計模式是一種有效的實現節點通信的方式,但是它有兩個問題:

  1. PubSub 的數據是即時的,會隨著 Redis 宕機而丟失;
  2. 寫基於 PubSub 的通信服務會要求用到 goroutine 和 channel ,這加大了開發難度,降低了可維護性。

其中,第二個問題是比較棘手的。如果我們希望加入更多的功能,需要寫大量的異步代碼,這會加大系統模塊間的耦合度,造成擴展性很差,而且代碼閱讀起來很痛苦。

因此,為了解決這個問題,我們採用了基於 Redis 延遲消息隊列的 RPC 服務。

延遲隊列架構

下圖是基於延遲隊列架構的 RPC 實現示意圖。

用 Golang 實現基於 Redis 的安全高效 RPC 通信

每一個節點都有一個客戶端(Client)和服務端(Server)。客戶端用於發送消息到目標節點(Target Node)並接收其返回的消息,服務端用於接收、處理源節點(Source Node)的消息並返回消息給源節點的客戶端。

整個 RPC 通信的流程如下:

  1. 源節點的客戶端通過 LPUSH 將消息推送到 Redis 的 nodes:<node> 中,並執行 BRPOP nodes:<node>: 阻塞並監聽這個消息隊列;/<node>/<node>
  2. 目標節點的服務端通過 BRPOP 一直在監聽 nodes:<node> ,收到消息後,通過消息中的 Method 字段執行對應的程序;/<node>
  3. 目標節點執行完畢後,服務端通過 LPUSH 將消息推送到 Redis 的 nodes:<node>: 中;/<node>
  4. 由於源節點客戶端一直在監聽 nodes:<node>: 這個消息隊列,當目標節點服務端推送消息到這個隊列後,源節點客戶端將立即收到返回的消息,再做後續處理。/<node>

這樣,整個節點的通信流程就通過 Redis 完成了。這樣做的好處在於不用暴露 HTTP 的 IP 地址和端口,只需要知道節點 ID 即可完成 RPC 通信。

這樣設計後的 RPC 代碼比較容易理解和維護。每次需要擴展新的通信類別時,只需要繼承 rpc.Service 類,實現 ClientHandle (客戶端處理方法)和 ServerHandle (服務端處理方法)方法就可以了。

這裡多說一下 BRPOP 。它將移出並獲取消息隊列的最後一個元素, 如果消息隊列沒有元素會阻塞隊列直到等待超時或發現可彈出元素為止。因此,使用 BRPOP 命令相對於輪訓或其他方式,可以避免不間斷的請求 Redis,避免浪費網絡和計算資源。

如果對 Redis 的操作命令不熟悉的,可以參考一下掘金小冊 《Redis 深度歷險:核心原理與應用實踐》 ,這本小冊深入介紹了 Redis 的原理以及工程實踐,對於應用 Redis 到實際開發中非常實用。

代碼實踐

講了這麼多理論知識,我們還是需要看看代碼的。老師常教育我們:“Talk is cheap. Show me the code.”

由於 Crawlab 後端是 Golang 開發的,要理解以下代碼需要一些 Golang 的基礎知識。

消息數據結構

首先我們需要定一個傳輸消息的數據結構。代碼如下。

<code>package entity

type RpcMessage struct {
Id string `json:"id"` // 消息ID
Method string `json:"method"` // 消息方法
NodeId string `json:"node_id"` // 節點ID
Params map[string]string `json:"params"` // 參數
Timeout int `json:"timeout"` // 超時
Result string `json:"result"` // 結果
Error string `json:"error"` // 錯誤
}/<code>

這裡,我們定義了消息 ID、方法、節點 ID、參數等字段。消息 ID 是 UUID,保證了消息 ID 的唯一性。

基礎接口

首先,我們定義一個抽象基礎接口,方便讓實際業務邏輯模塊繼承。服務端的處理邏輯在 ServerHandle 中,返回 entity 裡的 RpcMessage ,而客戶端的邏輯在 ClientHandle 中。

<code>// RPC服務基礎類
type Service interface {
ServerHandle() (entity.RpcMessage, error)
ClientHandle() (interface{}, error)
}/<code>

客戶端通用方法

當我們調用客戶端的通用方法的時候,需要實現兩個邏輯:

  1. 發送消息:生成消息 ID,將消息序列化為 JSON,LPUSH 推入 Redis 消息隊列;
  2. 通過 BRPOP 延遲獲取返回的消息,返回給調用方。

以下是實現的代碼。

<code>// 客戶端處理消息函數
func ClientFunc(msg entity.RpcMessage) func() (entity.RpcMessage, error) {
return func() (replyMsg entity.RpcMessage, err error) {
// 請求ID
msg.Id = uuid.NewV4().String()

// 發送RPC消息
msgStr := utils.ObjectToString(msg)
if err := database.RedisClient.LPush(fmt.Sprintf("rpc:%s", msg.NodeId), msgStr); err != nil {
log.Errorf("RpcClientFunc error: " + err.Error())
debug.PrintStack()
return replyMsg, err
}

// 獲取RPC回覆消息
dataStr, err := database.RedisClient.BRPop(fmt.Sprintf("rpc:%s:%s", msg.NodeId, msg.Id), msg.Timeout)
if err != nil {
log.Errorf("RpcClientFunc error: " + err.Error())
debug.PrintStack()
return replyMsg, err
}

// 反序列化消息
if err := json.Unmarshal([]byte(dataStr), &replyMsg); err != nil {
log.Errorf("RpcClientFunc error: " + err.Error())
debug.PrintStack()
return replyMsg, err
}

// 如果返回消息有錯誤,返回錯誤
if replyMsg.Error != "" {
return replyMsg, errors.New(replyMsg.Error)
}

return
}
}/<code>

服務端處理

服務端處理的邏輯如下,大致的邏輯是:

  1. 在一個循環中,通過 BRPOP 獲取該節點對應的消息;
  2. 當獲取到消息時,生成一個 goroutine 異步處理該消息;
  3. 繼續等待。

您可以在 InitRpcService 這個方法中看到上述邏輯。私有方法 handleMsg 實現了序列化、調用服務端 RPC 服務方法、發送返回消息的邏輯。如果需要拓展 RPC 方法類型,在工廠類方法 GetService 裡添加就可以了。

<code>// 獲取RPC服務
func GetService(msg entity.RpcMessage) Service {
switch msg.Method {
case constants.RpcInstallLang:
return &InstallLangService{msg: msg}
case constants.RpcInstallDep:
return &InstallDepService{msg: msg}
case constants.RpcUninstallDep:
return &UninstallDepService{msg: msg}
case constants.RpcGetLang:
return &GetLangService{msg: msg}
case constants.RpcGetInstalledDepList:
return &GetInstalledDepsService{msg: msg}
}
return nil
}

// 處理RPC消息
func handleMsg(msgStr string, node model.Node) {
// 反序列化消息
var msg entity.RpcMessage
if err := json.Unmarshal([]byte(msgStr), &msg); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
}

// 獲取service
service := GetService(msg)

// 根據Method調用本地方法
replyMsg, err := service.ServerHandle()
if err != nil {
log.Errorf(err.Error())
debug.PrintStack()
}

// 發送返回消息
if err := database.RedisClient.LPush(fmt.Sprintf("rpc:%s:%s", node.Id.Hex(), replyMsg.Id), utils.ObjectToString(replyMsg)); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
}
}

// 初始化服務端RPC服務
func InitRpcService() error {
go func() {
for {
// 獲取當前節點
node, err := model.GetCurrentNode()
if err != nil {
log.Errorf(err.Error())
debug.PrintStack()
continue
}

// 獲取獲取消息隊列信息
msgStr, err := database.RedisClient.BRPop(fmt.Sprintf("rpc:%s", node.Id.Hex()), 0)
if err != nil {
if err != redis.ErrNil {
log.Errorf(err.Error())
debug.PrintStack()
}
continue
}

// 處理消息
go handleMsg(msgStr, node)
}
}()
return nil
}/<code>

調用方法例子

Crawlab 的節點上經常需要為爬蟲安裝一些第三方依賴,例如 pymongo、requests 等。而其中,我們也需要直到某個節點上是否已經安裝了某個依賴,這需要跨服務器通信,也就是需要在分佈式網絡中進行雙向通信。而這個邏輯是通過 RPC 實現的。主節點向目標節點發起 RPC 調用,目標節點運行被調用方法,將運行結果也就是安裝的依賴列表返回給客戶端,客戶端再返回給調用者。

下面的代碼實現了獲取目標節點上已安裝的依賴的 RPC 方法。

<code>// 獲取已安裝依賴服務
// 繼承Service基礎類
type GetInstalledDepsService struct {
msg entity.RpcMessage
}

// 服務端處理方法
// 重載ServerHandle
func (s *GetInstalledDepsService) ServerHandle() (entity.RpcMessage, error) {
lang := utils.GetRpcParam("lang", s.msg.Params)
deps, err := GetInstalledDepsLocal(lang)
if err != nil {
s.msg.Error = err.Error()
return s.msg, err
}
resultStr, _ := json.Marshal(deps)
s.msg.Result = string(resultStr)
return s.msg, nil
}

// 客戶端處理方法
// 重載ClientHandle
func (s *GetInstalledDepsService) ClientHandle() (o interface{}, err error) {

// 發起 RPC 請求,獲取服務端數據
s.msg, err = ClientFunc(s.msg)()
if err != nil {
return o, err
}

// 反序列化
var output []entity.Dependency
if err := json.Unmarshal([]byte(s.msg.Result), &output); err != nil {
return o, err
}
o = output

return
}/<code>

發起調用

寫好了 RPC 服務端和客戶端處理方法,就可以輕鬆編寫調用邏輯了。以下是調用獲取遠端已安裝依賴列表的方法。首先由 GetService 工廠類獲取之前定義好的 GetInstalledDepsService ,再調用其客戶端處理方法 ClientHandle ,然後返回結果。這就像在本地調用方法一樣。是不是很簡單?

<code>// 獲取遠端已安裝依賴
func GetInstalledDepsRemote(nodeId string, lang string) (deps []entity.Dependency, err error) {
params := make(map[string]string)
params["lang"] = lang
s := GetService(entity.RpcMessage{
NodeId: nodeId,
Method: constants.RpcGetInstalledDepList,
Params: params,
Timeout: 60,
})
o, err := s.ClientHandle()
if err != nil {
return
}
deps = o.([]entity.Dependency)
return
}/<code>

結語

本篇文章主要介紹了一種基於 Redis 延遲隊列的 RPC 通信方式,這種方式不用暴露各個節點或服務的 IP 地址或端口,是一種非常安全的方式。而且,這種方式已經用 Golang 在 Crawlab 中實現了雙向通信,特別是 Golang 中的天生支持異步的 goroutine,讓這種方式的實現變得簡單。實際上,這種方式理論上是非常高效的,能夠支持高併發數據傳輸。

但是,在 Crawlab 的實現中還存在一些隱患,也就是它並沒有限制服務端的處理併發數量。因此如果傳輸消息過多時,服務端資源會被佔滿,導致處理速度變慢甚至宕機的風險。修復方式是在服務端限制併發數量。另外,限於時間的原因,作者還沒有來得及測試這種 RPC 通信方式的實際傳輸效率,容錯機制也沒有加入。因此總的來說還有很大的提升和優化空間。

雖然如此,這種方式對於 Crawlab 的低併發遠程通信來說是足夠的了,在實際使用中也沒有出現問題,非常穩定。對於隱秘性有要求、希望不暴露地址信息的開發者,我們也推薦將該種方式在實際應用中嘗試。


分享到:


相關文章: