360技术团队:300行Go代码玩转RPC

在本篇文章中将通过用300行纯Golang编写简单的RPC框架来解释RPC。希望能帮助大家梳理RPC相关知识点。

我们通过从头开始在Golang中构建一个简单的RPC框架来学习RPC基础构成。

什么是RPC

简单地说,服务A想调用服务B的函数。但是这两个服务不在同一个内存空间中。所以不能直接调用它。

因此,为了实现这个调用,我们需要表达如何调用以及如何通过网络传递通信的语义。

让我们考虑一下,当我们可以在相同的内存空间(本地调用)中运行时,我们要怎么做。

type User struct {
\tName string
\tAge int
}
var userDB = map[int]User{
\t1: User{"Ankur", 85},
\t9: User{"Anand", 25},
\t8: User{"Ankur Anand", 27},
}
func QueryUser(id int) (User, error) {
\tif u, ok := userDB[id]; ok {
\t\treturn u, nil
\t}
\treturn User{}, fmt.Errorf("id %d not in user db", id)
}
func main() {
\tu , err := QueryUser(8)
\tif err != nil {
\t\tfmt.Println(err)
\t\treturn
\t}
\tfmt.Printf("name: %s, age: %d \\n", u.Name, u.Age)

}

现在我们如何在网络上进行相同的函数调用

客户端将通过网络调用 QueryUser(id int) 函数,并且将有一个服务端提供对该函数的调用,并返回响应 User{"Name", id}, nil。

网络传输数据格式

我们将采用TLV(定长报头+变长消息体)编码方案来规范 tcp 上的数据传输。稍后会详细介绍

在通过网络发送数据之前,我们需要定义如何通过网络发送数据的结构。

这有助于我们定义一个通用协议,客户端和服务端都可以理解这个协议。(protobuf IDL定义了服务端和客户端都能理解的内容)。

因此,服务端接收到的数据、要调用的函数名和参数列表,或者来自客户端的数据都需要传递这些参数。

另外,让我们约定第二个返回值的类型为 error,表示RPC调用结果。

// RPC数据传输格式
type RPCdata struct {
\tName string // name of the function

\tArgs []interface{} // request's or response's body expect error.
\tErr string // Error any executing remote server
}

现在我们有了一个格式,我们需要序列化它以便我们可以通过网络发送它。在本例中,我们将使用 go 默认的二进制序列化协议进行编码和解码。

// be sent over the network.
func Encode(data RPCdata) ([]byte, error) {
\tvar buf bytes.Buffer
\tencoder := gob.NewEncoder(&buf)
\tif err := encoder.Encode(data); err != nil {
\t\treturn nil, err
\t}
\treturn buf.Bytes(), nil
}
// Decode the binary data into the Go struct
func Decode(b []byte) (RPCdata, error) {
\tbuf := bytes.NewBuffer(b)
\tdecoder := gob.NewDecoder(buf)
\tvar data RPCdata
\tif err := decoder.Decode(&data); err != nil {
\t\treturn Data{}, err
\t}
\treturn data, nil
}

网络传输

选择 TLV 协议的原因是由于其非常容易实现,同时也完成了我们需要识别的数据读取的长度,因为我们需要确定这个请求读取的字节数的传入请求流。发送和接收都执行相同的操作。

// Transport will use TLV protocol
type Transport struct {
\tconn net.Conn // Conn is a generic stream-oriented network connection.
}

// NewTransport creates a Transport
func NewTransport(conn net.Conn) *Transport {
\treturn &Transport{conn}
}
// Send TLV data over the network
func (t *Transport) Send(data []byte) error {
\t// we will need 4 more byte then the len of data
\t// as TLV header is 4bytes and in this header
\t// we will encode how much byte of data
\t// we are sending for this request.
\tbuf := make([]byte, 4+len(data))
\tbinary.BigEndian.PutUint32(buf[:4], uint32(len(data)))
\tcopy(buf[4:], data)
\t_, err := t.conn.Write(buf)
\tif err != nil {
\t\treturn err
\t}
\treturn nil
}
// Read TLV sent over the wire
func (t *Transport) Read() ([]byte, error) {
\theader := make([]byte, 4)
\t_, err := io.ReadFull(t.conn, header)
\tif err != nil {
\t\treturn nil, err
\t}
\tdataLen := binary.BigEndian.Uint32(header)
\tdata := make([]byte, dataLen)
\t_, err = io.ReadFull(t.conn, data)
\tif err != nil {
\t\treturn nil, err
\t}
\treturn data, nil
}

现在我们已经定义了数据格式和传输协议。下面我们还需要RPC服务器和RPC客户端的实现。

RPC函数

RPC服务器将接收具有函数名的 RPCData。因此,我们需要维护和映射包含函数名到实际函数映射的函数

// RPCServer ... 

type RPCServer struct {
\taddr string
\tfuncs map[string] reflect.Value
}
// Register the name of the function and its entries
func (s *RPCServer) Register(fnName string, fFunc interface{}) {
\tif _,ok := s.funcs[fnName]; ok {
\t\treturn
\t}
\ts.funcs[fnName] = reflect.ValueOf(fFunc)
}

现在我们已经注册了 func,当我们收到请求时,我们将检查函数执行期间传递的func的名称是否存在。然后执行相应的操作

// Execute the given function if present
func (s *RPCServer) Execute(req RPCdata) RPCdata {
\t// get method by name
\tf, ok := s.funcs[req.Name]
\tif !ok {
\t\t// since method is not present
\t\te := fmt.Sprintf("func %s not Registered", req.Name)
\t\tlog.Println(e)
\t\treturn RPCdata{Name: req.Name, Args: nil, Err: e}
\t}
\tlog.Printf("func %s is called\\n", req.Name)
\t// unpackage request arguments
\tinArgs := make([]reflect.Value, len(req.Args))
\tfor i := range req.Args {
\t\tinArgs[i] = reflect.ValueOf(req.Args[i])
\t}
\t// invoke requested method
\tout := f.Call(inArgs)
\t// now since we have followed the function signature style where last argument will be an error
\t// so we will pack the response arguments expect error.
\tresArgs := make([]interface{}, len(out) - 1)
\tfor i := 0; i < len(out) - 1; i ++ {
\t\t// Interface returns the constant value stored in v as an interface{}.
\t\tresArgs[i] = out[i].Interface()
\t}
\t// pack error argument
\tvar er string
\tif e, ok := out[len(out) - 1].Interface().(error); ok {
\t\t// convert the error into error string value
\t\ter = e.Error()

\t}
\treturn RPCdata{Name: req.Name, Args: resArgs, Err: er}
}

RPC客户端

由于函数的具体实现在服务器端,客户端只有函数的原型,所以我们需要调用函数的完整原型,这样我们才能调用它。

func (c *Client) callRPC(rpcName string, fPtr interface{}) {
\tcontainer := reflect.ValueOf(fPtr).Elem()
\tf := func(req []reflect.Value) []reflect.Value {
\t\tcReqTransport := NewTransport(c.conn)
\t\terrorHandler := func(err error) []reflect.Value {
\t\t\toutArgs := make([]reflect.Value, container.Type().NumOut())
\t\t\tfor i := 0; i < len(outArgs)-1; i++ {
\t\t\t\toutArgs[i] = reflect.Zero(container.Type().Out(i))
\t\t\t}
\t\t\toutArgs[len(outArgs)-1] = reflect.ValueOf(&err).Elem()
\t\t\treturn outArgs
\t\t}
\t\t// Process input parameters
\t\tinArgs := make([]interface{}, 0, len(req))
\t\tfor _, arg := range req {
\t\t\tinArgs = append(inArgs, arg.Interface())
\t\t}
\t\t// ReqRPC
\t\treqRPC := RPCdata{Name: rpcName, Args: inArgs}
\t\tb, err := Encode(reqRPC)
\t\tif err != nil {
\t\t\tpanic(err)
\t\t}
\t\terr = cReqTransport.Send(b)
\t\tif err != nil {
\t\t\treturn errorHandler(err)
\t\t}
\t\t// receive response from server
\t\trsp, err := cReqTransport.Read()
\t\tif err != nil { // local network error or decode error
\t\t\treturn errorHandler(err)
\t\t}
\t\trspDecode, _ := Decode(rsp)
\t\tif rspDecode.Err != "" { // remote server error
\t\t\treturn errorHandler(errors.New(rspDecode.Err))
\t\t}

\t\tif len(rspDecode.Args) == 0 {
\t\t\trspDecode.Args = make([]interface{}, container.Type().NumOut())
\t\t}
\t\t// unpackage response arguments
\t\tnumOut := container.Type().NumOut()
\t\toutArgs := make([]reflect.Value, numOut)
\t\tfor i := 0; i < numOut; i++ {
\t\t\tif i != numOut-1 { // unpackage arguments (except error)
\t\t\t\tif rspDecode.Args[i] == nil { // if argument is nil (gob will ignore "Zero" in transmission), set "Zero" value
\t\t\t\t\toutArgs[i] = reflect.Zero(container.Type().Out(i))
\t\t\t\t} else {
\t\t\t\t\toutArgs[i] = reflect.ValueOf(rspDecode.Args[i])
\t\t\t\t}
\t\t\t} else { // unpackage error argument
\t\t\t\toutArgs[i] = reflect.Zero(container.Type().Out(i))
\t\t\t}
\t\t}
\t\treturn outArgs
\t}
\tcontainer.Set(reflect.MakeFunc(container.Type(), f))
}

测试一下我们的框架

package main
import (
\t"encoding/gob"
\t"fmt"
\t"net"
)
type User struct {
\tName string
\tAge int
}
var userDB = map[int]User{
\t1: User{"Ankur", 85},
\t9: User{"Anand", 25},
\t8: User{"Ankur Anand", 27},
}
func QueryUser(id int) (User, error) {
\tif u, ok := userDB[id]; ok {
\t\treturn u, nil
\t}
\treturn User{}, fmt.Errorf("id %d not in user db", id)
}
func main() {
\t// new Type needs to be registered
\tgob.Register(User{})
\taddr := "localhost:3212"

\tsrv := NewServer(addr)
\t// start server
\tsrv.Register("QueryUser", QueryUser)
\tgo srv.Run()
\t// wait for server to start.
\ttime.Sleep(1 * time.Second)
\t// start client
\tconn, err := net.Dial("tcp", addr)
\tif err != nil {
\t\tpanic(err)
\t}
\tcli := NewClient(conn)
\tvar Query func(int) (User, error)
\tcli.callRPC("QueryUser", &Query)
\tu, err := Query(1)
\tif err != nil {
\t\tpanic(err)
\t}
\tfmt.Println(u)
\tu2, err := Query(8)
\tif err != nil {
\t\tpanic(err)
\t}
\tfmt.Println(u2)
}

执行:go run main.go

输出内容

2019/07/23 20:26:18 func QueryUser is called
{Ankur 85}
2019/07/23 20:26:18 func QueryUser is called
{Ankur Anand 27}

总结

致此我们简单的RPC框架就实现完成了,旨在帮大家理解RPC的原理及上手简单实践。

原文来自:360技术团队

关注TechTree,关注互联网一线大厂技术干货。


分享到:


相關文章: