什麼是etcd?什麼是grpc?為什麼要使用etcd。本文將簡單對etcd與grpc介紹與代碼實現。
編輯器原因導致代碼有點亂,如果需要查看原文www.studygolang.com/articles/26935#reply0
go語言中文文檔:www.topgoer.com
etcd
etcd是一個高可用的鍵值分佈式存儲系統,主要用於共享配置和服務發現。etcd使用Go語言編寫,並通過Raft一致性算法處理日誌複製以保證強一致性。Raft通過選舉的方式來實現一致性,在Raft中,任何一個節點都可能成為Leader。k8s也使用了etcd。
Raft算法:Leader領導者: 處理所有客戶端交互,日誌複製等,一般一次只有一個Leader.Follower信徒: 類似選民,完全被動Candidate候選人: 可以被選為一個新的領導人。
- docker-compose安裝etcd v3
官方elcolio/etcd鏡像是v2版本,所以這裡使用的是bitnami/etcd鏡像,
docker-compose.yaml
version: '3' services: etcd1: image: bitnami/etcd container_name: etcd1 ports: - 2379:2379 environment: - ALLOW_NONE_AUTHENTICATION=yes - ETCD_NAME=etcd1 - ETCD_INITIAL_ADVERTISE_PEER_URLS=http://etcd1:2380 - ETCD_LISTEN_PEER_URLS=http://0.0.0.0:2380 - ETCD_LISTEN_CLIENT_URLS=http://0.0.0.0:2379 - ETCD_ADVERTISE_CLIENT_URLS=http://etcd1:2379 - ETCD_INITIAL_CLUSTER_TOKEN=etcd-cluster - ETCD_INITIAL_CLUSTER=etcd1=http://etcd1:2380,etcd2=http://etcd2:2380,etcd3=http://etcd3:2380 - ETCD_INITIAL_CLUSTER_STATE=new etcd2: image: bitnami/etcd container_name: etcd2 ports: - 22379:2379 environment: - ALLOW_NONE_AUTHENTICATION=yes - ETCD_NAME=etcd2 - ETCD_INITIAL_ADVERTISE_PEER_URLS=http://etcd2:2380 - ETCD_LISTEN_PEER_URLS=http://0.0.0.0:2380 - ETCD_LISTEN_CLIENT_URLS=http://0.0.0.0:2379 - ETCD_ADVERTISE_CLIENT_URLS=http://etcd2:2379 - ETCD_INITIAL_CLUSTER_TOKEN=etcd-cluster - ETCD_INITIAL_CLUSTER=etcd1=http://etcd1:2380,etcd2=http://etcd2:2380,etcd3=http://etcd3:2380 - ETCD_INITIAL_CLUSTER_STATE=new etcd3: image: bitnami/etcd container_name: etcd3 ports: - 32379:2379 environment: - ALLOW_NONE_AUTHENTICATION=yes - ETCD_NAME=etcd3 - ETCD_INITIAL_ADVERTISE_PEER_URLS=http://etcd3:2380 - ETCD_LISTEN_PEER_URLS=http://0.0.0.0:2380 - ETCD_LISTEN_CLIENT_URLS=http://0.0.0.0:2379 - ETCD_ADVERTISE_CLIENT_URLS=http://etcd3:2379 - ETCD_INITIAL_CLUSTER_TOKEN=etcd-cluster - ETCD_INITIAL_CLUSTER=etcd1=http://etcd1:2380,etcd2=http://etcd2:2380,etcd3=http://etcd3:2380 - ETCD_INITIAL_CLUSTER_STATE=new
由於etcd沒有自帶管理界面,可以使用e3w。
- go的etcd v3安裝包
go get -u -v go.etcd.io/etcd/clientv3
- etcd服務註冊
服務註冊:主要思路是創建一個lease租約,put一個前綴的Key(方便服務發現時根據前綴取key對應的值);然後通過keepAlive續約,並監聽keepAlive通道保持在線,如果不監聽,etcd會刪除這個租約上的key。
服務註冊代碼:registry.go
package etcd import ( "context" "encoding/json" "errors" "go.etcd.io/etcd/clientv3" "log" "time" ) // 服務信息 type ServiceInfo struct { Name string IP string } type Service struct { ServiceInfo ServiceInfo stop chan error leaseId clientv3.LeaseID client *clientv3.Client } // NewService 創建一個註冊服務 func NewService(info ServiceInfo, endpoints []string) (service *Service, err error) { client, err := clientv3.New(clientv3.Config{ Endpoints: endpoints, DialTimeout: time.Second * 200, }) if err != nil { log.Fatal(err) return nil, err } service = &Service{ ServiceInfo: info, client: client, } return } // Start 註冊服務啟動 func (service *Service) Start() (err error) { ch, err := service.keepAlive() if err != nil { log.Fatal(err) return } for { select { case err :=
- etcd服務發現(grpc)
服務發現:通過前綴取出key對應的values;然後啟動一個監聽服務監聽key的變化
服務發現代碼(grpc):grpc_resolver.go
package etcd import ( "context" "encoding/json" "fmt" "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/mvcc/mvccpb" "google.golang.org/grpc/resolver" "log" ) const schema = "etcd" // resolver is the implementaion of grpc.resolve.Builder // Resolver 實現grpc的grpc.resolve.Builder接口的Build與Scheme方法 type Resolver struct { endpoints []string service string cli *clientv3.Client cc resolver.ClientConn } // NewResolver return resolver builder // endpoints example: http://127.0.0.1:2379 http://127.0.0.1:12379 http://127.0.0.1:22379" // service is service name func NewResolver(endpoints []string, service string) resolver.Builder { return &Resolver{endpoints: endpoints, service: service} } // Scheme return etcd schema func (r *Resolver) Scheme() string { // 最好用這種,因為grpc resolver.Register(r)在註冊時,會取scheme,如果一個系統有多個grpc發現,就會覆蓋之前註冊的 return schema + "_" + r.service } // ResolveNow func (r *Resolver) ResolveNow(rn resolver.ResolveNowOption) { } // Close func (r *Resolver) Close() { } // Build to resolver.Resolver // 實現grpc.resolve.Builder接口的方法 func (r *Resolver) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOption) (resolver.Resolver, error) { var err error r.cli, err = clientv3.New(clientv3.Config{ Endpoints: r.endpoints, }) if err != nil { return nil, fmt.Errorf("grpclb: create clientv3 client failed: %v", err) } r.cc = cc // go r.watch(fmt.Sprintf("/%s/%s/", schema, r.service)) go r.watch(fmt.Sprintf(r.service)) return r, nil } func (r *Resolver) watch(prefix string) { addrDict := make(map[string]resolver.Address) update := func() { addrList := make([]resolver.Address, 0, len(addrDict)) for _, v := range addrDict { addrList = append(addrList, v) } r.cc.NewAddress(addrList) } resp, err := r.cli.Get(context.Background(), prefix, clientv3.WithPrefix()) if err == nil { for i, kv := range resp.Kvs { info := &ServiceInfo{} err := json.Unmarshal([]byte(kv.Value), info) if err != nil { } addrDict[string(resp.Kvs[i].Value)] = resolver.Address{Addr: info.IP} } } update() rch := r.cli.Watch(context.Background(), prefix, clientv3.WithPrefix(), clientv3.WithPrevKV()) for n := range rch { for _, ev := range n.Events { switch ev.Type { case mvccpb.PUT: info := &ServiceInfo{} err := json.Unmarshal([]byte(ev.Kv.Value), info) if err != nil { log.Println(err) } else { addrDict[string(ev.Kv.Key)] = resolver.Address{Addr: info.IP} } case mvccpb.DELETE: delete(addrDict, string(ev.PrevKv.Key)) } } update() } }
grpc
- RPC是什麼
在分佈式計算,遠程過程調用(英語:Remote Procedure Call,縮寫為 RPC)是一個計算機通信協議。該協議允許運行於一臺計算機的程序調用另一個地址空間(通常為一個開放網絡的一臺計算機)的子程序,而程序員就像調用本地程序一樣,無需額外地為這個交互作用編程(無需關注細節)。RPC是一種服務器-客戶端(Client/Server)模式,經典實現是一個通過發送請求-接受回應進行信息交互的系統。 - gRPC是什麼
gRPC是一種現代化開源的高性能RPC框架,能夠運行於任意環境之中。最初由谷歌進行開發。它使用HTTP/2作為傳輸協議。
在gRPC裡,客戶端可以像調用本地方法一樣直接調用其他機器上的服務端應用程序的方法,是你更容易創建分佈式應用程序和服務。與許多RPC系統一樣,gRPC是基於定義一個服務,指定一個可以遠程調用的帶有參數和返回類型的的方法。在服務端程序中實現這個接口並且運行gRPC服務處理客戶端調用。在客戶端,有一個stub提供和服務端相同的方法。
image.png
- 為什麼要用gRPC
使用gRPC, 我們可以一次性的在一個.proto文件中定義服務並使用任何支持它的語言去實現客戶端和服務端,反過來,它們可以應用在各種場景中,從Google的服務器到你自己的平板電腦—— gRPC幫你解決了不同語言及環境間通信的複雜性。使用protocol buffers還能獲得其他好處,包括高效的序列號,簡單的IDL以及容易進行接口更新。總之一句話,使用gRPC能讓我們更容易編寫跨語言的分佈式代碼。 - 安裝grpc
go get -u google.golang.org/grpc
- 安裝Protocol Buffers v3
安裝用於生成gRPC服務代碼的協議編譯器,最簡單的方法是從下面的鏈接:https://github.com/google/protobuf/releases下載適合你平臺的預編譯好的二進制文件(protoc-<version>-<platform>.zip)。/<platform>/<version>
下載完之後,執行下面的步驟:
- 解壓下載好的文件
- 把protoc二進制文件的路徑加到環境變量中
接下來執行下面的命令安裝protoc的Go插件:
go get -u github.com/golang/protobuf/protoc-gen-go
編譯插件protoc-gen-go將會安裝到$GOBIN,默認是$GOPATH/bin,它必須在你的$PATH中以便協議編譯器protoc能夠找到它。
- grpc簡單示例
目錄結構
. ├── mail │ ├── Makefile │ ├── client_test.go │ └── server.go ├── proto │ ├── mail │ ├── mail.pb.go │ └── mail.proto
編寫proto代碼
syntax = "proto3"; // 版本聲明,使用Protocol Buffers v3版本 package g.srv.mail; // 包名 // 創建一個郵件服務 service MailService { rpc SendMail (MailRequest) returns (MailResponse) { } } // 請求消息 message MailRequest { string Mail = 1; string Text = 2; } // 響應消息 message MailResponse { bool Ok = 1; }
Makefile文件
build: protoc --proto_path=../proto --go_out=plugins=grpc:../proto ../proto/mail/mail.proto
執行make build就會生成mail.pb.go文件
編寫grpc服務端代碼
package main import ( "context" "fmt" "google.golang.org/grpc" "google.golang.org/grpc/reflection" "log" "net" pb "chen/app/srvs/proto/mail" "chen/pkg/etcd" ) type service struct { } func (s *service) SendMail(ctx context.Context, req *pb.MailRequest) (res *pb.MailResponse, err error) { fmt.Printf("郵箱:%s;發送內容:%s", req.Mail, req.Text) return &pb.MailResponse{ Ok: true, }, nil } func main() { // 監聽本地的8972端口 lis, err := net.Listen("tcp", ":8972") if err != nil { fmt.Printf("failed to listen: %v", err) return } s := grpc.NewServer() // 創建gRPC服務器 pb.RegisterMailServiceServer(s, &service{}) // 在gRPC服務端註冊服務 reflection.Register(s) //在給定的gRPC服務器上註冊服務器反射服務 // Serve方法在lis上接受傳入連接,為每個連接創建一個ServerTransport和server的goroutine。 // 該goroutine讀取gRPC請求,然後調用已註冊的處理程序來響應它們。 //etcd服務註冊 reg, err := etcd.NewService(etcd.ServiceInfo{ Name: "g.srv.mail", IP: "127.0.0.1:8972", //grpc服務節點ip }, []string{"127.0.0.1:2379", "127.0.0.1:22379", "127.0.0.1:32379"}) // etcd的節點ip if err != nil { log.Fatal(err) } go reg.Start() if err := s.Serve(lis); err != nil { fmt.Println(err) } }
編寫grpc客戶端代碼
package main import ( "context" "fmt" "google.golang.org/grpc" "time" //"google.golang.org/grpc/balancer/roundrobin" "google.golang.org/grpc/resolver" "log" "testing" //"time" pb "chen/app/srvs/proto/mail" "chen/pkg/etcd" ) func TestService_SendMail(t *testing.T) { r := etcd.NewResolver([]string{ "127.0.0.1:2379", "127.0.0.1:22379", "127.0.0.1:32379", }, "g.srv.mail") resolver.Register(r) ctx, _ := context.WithTimeout(context.Background(), 3*time.Second) // https://github.com/grpc/grpc/blob/master/doc/naming.md // The gRPC client library will use the specified scheme to pick the right resolver plugin and pass it the fully qualified name string. addr := fmt.Sprintf("%s:///%s", r.Scheme(), "g.srv.mail" /*g.srv.mail經測試,這個可以隨便寫,底層只是取scheme對應的Build對象*/) conn, err := grpc.DialContext(ctx, addr, grpc.WithInsecure(), // grpc.WithBalancerName(roundrobin.Name), //指定初始化round_robin => balancer (後續可以自行定製balancer和 register、resolver 同樣的方式) grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`), grpc.WithBlock()) // 這種方式也行 //conn, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithBalancerName("round_robin")) //conn, err := grpc.Dial(":8972", grpc.WithInsecure()) if err != nil { log.Fatalf("failed to dial: %v", err) } /*conn, err := grpc.Dial( fmt.Sprintf("%s://%s/%s", "consul", GetConsulHost(), s.Name), //不能block => blockkingPicker打開,在調用輪詢時picker_wrapper => picker時若block則不進行robin操作直接返回失敗 //grpc.WithBlock(), grpc.WithInsecure(), //指定初始化round_robin => balancer (後續可以自行定製balancer和 register、resolver 同樣的方式) grpc.WithBalancerName(roundrobin.Name), //grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`), ) //原文鏈接:https://blog.csdn.net/qq_35916684/article/details/104055246*/ if err != nil { panic(err) } c := pb.NewMailServiceClient(conn) resp, err := c.SendMail(context.TODO(), &pb.MailRequest{ Mail: "[email protected]", Text: "test,test", }) log.Print(resp) }
運行代碼就能看到效果了。
閱讀更多 go的慢慢學習路 的文章