分布式組件etcd應用

技術漫談 | 分佈式組件etcd應用

一、搭建etcd環境

我將使用 etcd v3版本, so,本地先建個單機版本ETCD環境。

docker pull xieyanze/etcd3:latest
docker run —name etcd-v3.0.9 -d -v /tmp:/data \
-p 2379:2379 -p 2380:2380 -p 4001:4001 -p 7001:7001 xieyanze/etcd3:latest
docker exec -it etcd-v3.0.9 sh
當前默認還是v2版本通過設定環境變量export ETCDCTL_API=3,設置成V3版本。
export ETCDCTL_API=3
etcdctl put /test/ok 11
etcdctl put /test/ok 22
etcdctl del /test/gg
#刪除所有/test前綴的節點
etcdctl del /test --prefix
etcdctl get /test/ok
# 前綴查詢
etcdctl get /test/ok --prefix

二、軟件邏輯結構

技術漫談 | 分佈式組件etcd應用

  1. k8s master cluster

    dev-7

    dev-8

  2. k8s slave cluster 1 env1

    dev-1

    dev-2

    dev-3

  3. k8s slave cluster 2 env2

    dev-4

    dev-5

    dev-6

三、controller 與 agent 服務註冊與發現

實現原理:

注意: etcd v3版本,k/v 的超時間時TTL最小5秒種。

  • 每2秒鐘,每個服務向etcd發送一次心跳包,證明自己還活著;

  • 當服務退出時,主動刪除etcd的key或者等到TTL超時之後,自動下線;

  • controller需要獲得agent的狀態,直接GET [ingress/agent/${env_uuid}/]就能獲得當前agent在線狀態;

  • agent需要獲得controller的狀態,直接GET [ingress/controller]就能獲得當前controller在線狀態。

技術漫談 | 分佈式組件etcd應用

技術漫談 | 分佈式組件etcd應用

四、軟件業務的實現

1. controller side:

  • 客戶端調用controller restful api.controller 直接寫入ETCD,同時寫入副本到mysql;

  • controller 如果關注於agent的變化,只需要watch ingress/agent這個目錄;

  • controller 是無狀態,不需要同步多個實例之間的數據,可以任意的scale它的實例數;

  • 如果controller掛掉之後,重啟加載mysql的數據庫同步到etcd中。

2. controller需要了解規則執行狀態

技術漫談 | 分佈式組件etcd應用

技術漫談 | 分佈式組件etcd應用

agent的執行狀態直接寫入配置狀態中,

先獲得當前ingress/agent/env1目錄下的agent列表,對比ingress/ingress_agent/env1/${config_uuid1}/status目錄下的規則完成之後反饋列表,每一個都存在時,則全部執行成功。

3. agent side:

  • 不同集群agent 通過etcd的watch功能在第一時間,獲得監聽到所有數據的變化 :新建、刪除、更新;

  • 不同集群agent 定時3分鐘獲得自已環境下的列表信息,同步處理相關信息;

  • 如果agent掛了之後,,重啟加載一次etcd中所有的ingress_conifg。

五、代碼實例

1. etcd clientv3 的封裝

  • 連接管理支,持TLS;

  • 增、刪、查,支持自動超時的設值;

  • watch 監聽目錄或KEY的值的變化(PUT,DELETE)。


package main

import (
"fmt"
"time"
// "github.com/coreos/etcd/pkg/transport"
"github.com/coreos/etcd/clientv3"
"golang.org/x/net/context"
// "sync"

)


type EtcdData struct {
Key string
Value string
}

type EtcdHelper struct {
RequestTimeout time.Duration
Client *clientv3.Client
}

func NewEtcdHelper() *EtcdHelper {

//tlsInfo := transport.TLSInfo{
// CertFile: "/tmp/test-certs/test-name-1.pem",
// KeyFile: "/tmp/test-certs/test-name-1-key.pem",
// TrustedCAFile: "/tmp/test-certs/trusted-ca.pem",
//}
//tlsInfo := transport.TLSInfo{
// CertFile: "./tls/apiserver.crt",
// KeyFile: "./tls/apiserver.key",
//}
//tlsConfig, err := tlsInfo.ClientConfig()
//if err != nil {
// fmt.Printf("%s", err.Error())
// return nil
//}

//cli, err := clientv3.New(clientv3.Config{
// Endpoints: []string{"dev-7:2379"},
// DialTimeout: 3 * time.Second,
// TLS: tlsConfig,
//})

cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
//Endpoints: []string{"http://dev-7:2379"},
DialTimeout: 3 * time.Second,
})

if err != nil {
fmt.Printf("%s", err.Error())
return nil
}

return &EtcdHelper{
RequestTimeout: 5 *time.Second,
Client: cli,

}
}

func (c *EtcdHelper) Release() {
if c.Client != nil {
c.Client.Close()
}
}

func (c *EtcdHelper) PutValue(key string, value string, ttl int64) error {
ctx, cancel := context.WithTimeout(context.Background(), c.RequestTimeout)
defer cancel()

// minimum lease TTL is 5-second
resp, err := c.Client.Grant(context.TODO(), ttl)
if err != nil {
fmt.Printf("%s\n", err.Error())
return err
}

_, err = c.Client.Put(ctx, key, value, clientv3.WithLease(resp.ID))
if err != nil {
fmt.Printf("%s\n", err.Error())
return err
}

return nil
}

func (c *EtcdHelper) SetValue(key string, value string) error {
ctx, cancel := context.WithTimeout(context.Background(), c.RequestTimeout)
defer cancel()

_, err := c.Client.Put(ctx, key, value)
if err != nil {
fmt.Printf("%s\n", err.Error())
return err
}

return nil
}

func (c *EtcdHelper) GetValue(key string) []EtcdData {
ctx, cancel := context.WithTimeout(context.Background(), c.RequestTimeout)
defer cancel()
resp, err := c.Client.Get(ctx, key, clientv3.WithPrefix())
if err != nil {
fmt.Printf("%s\n", err.Error())
return nil
}


var kv_slice []EtcdData
for _, ev := range resp.Kvs {
//fmt.Printf("%s : %s\n", ev.Key, ev.Value)

kv := EtcdData{string(ev.Key), string(ev.Value)}
kv_slice = append(kv_slice, kv)
}

return kv_slice
}

func (c *EtcdHelper) DelValue(key string) error {
ctx, cancel := context.WithTimeout(context.Background(), c.RequestTimeout)
defer cancel()
_, err := c.Client.Delete(ctx, key, clientv3.WithPrefix())
if err != nil {
fmt.Printf("%s\n", err.Error())
return err
}

return nil
}

func (c *EtcdHelper) Watch(key string) {
rch := c.Client.Watch(context.Background(), key, clientv3.WithPrefix())
for wresp := range rch {
for _, ev := range wresp.Events {
fmt.Printf("watch %s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
}
}
}

func (c *EtcdHelper) Listen(key string) clientv3.WatchChan {
return c.Client.Watch(context.Background(), key, clientv3.WithPrefix())
}

2. controller 的代碼實現

  • controller上線,下線功能;

  • controller定時發送心跳包到etcd;

  • controller監聽agent的變化.(1-3)完成服務註冊與發現;

  • controller通過下發配置到etcd,通知所有watch ingress_config變化的agent。


////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
type ControllerClient struct {
Period time.Duration
Name string
IP string
Helper *EtcdHelper

StopCha chan int

//Lock *sync.Mutex
}

func NewControllerClient(name string, host_ip string) *ControllerClient {
return &ControllerClient{
Period: 2,
Name: name,
IP: host_ip,
Helper: NewEtcdHelper(),
StopCha: make(chan int, 10),
//Lock: new(sync.Mutex),
}
}

func (cc *ControllerClient) Init(display bool) {
go func() {
cc.OnLine()

for {
select {
case fmt.Printf("online goroutinue is exited.")
return
case cc.OnLine()
}
}
}()



if display {
go func() {
watch_chan := cc.Helper.Listen("/ingress/agent")
for wresp := range watch_chan {
for _, ev := range wresp.Events {
fmt.Printf("watch %s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
}
}
}()
}
}

func (cc *ControllerClient) OnLine() {
key := fmt.Sprintf("/ingress/controller/%s", cc.Name)

//cc.Lock.Lock()
//defer cc.Lock.Unlock()
err := cc.Helper.PutValue(key, "1", 5)

if err != nil {
fmt.Printf(err.Error())
}
}

func (cc *ControllerClient) OffLine() {
close(cc.StopCha)

key := fmt.Sprintf("/ingress/controller/%s", cc.Name)

//cc.Lock.Lock()
//defer cc.Lock.Unlock()
err := cc.Helper.DelValue(key)

if err != nil {
fmt.Printf(err.Error())
}
}

func (cc *ControllerClient) GetIngressConfig(env_uuid string, uuid string) []EtcdData {
//TODO. first save to mysql.
key := fmt.Sprintf("/ingress/ingress_config/%s/%s", env_uuid, uuid)
//cc.Lock.Lock()
//defer cc.Lock.Unlock()
return cc.Helper.GetValue(key)


}

func (cc *ControllerClient) SetIngressConfig(env_uuid string, uuid string, config string) {
//TODO. first save to mysql.

key := fmt.Sprintf("/ingress/ingress_config/%s/%s", env_uuid, uuid)
//cc.Lock.Lock()
//defer cc.Lock.Unlock()
err := cc.Helper.SetValue(key, config)

if err != nil {
fmt.Printf(err.Error())
}
}

func (cc *ControllerClient) DelIngressConfig(env_uuid string, uuid string) {
//TODO. first update to mysql.

key := fmt.Sprintf("/ingress/ingress_config/%s/%s", env_uuid, uuid)

//cc.Lock.Lock()
//defer cc.Lock.Unlock()
err := cc.Helper.DelValue(key)

if err != nil {
fmt.Printf(err.Error())
}
}

3. agent代碼實現

  • agent上線,下線功能;

  • agent定時發送心跳包到etcd;

  • agent監聽(watch) controller的變化,(1-3)完成服務註冊與發現;

  • agnet監聽(watch) ingress_config變化的agent,實時完成更新或設置配置,刪除配置功能。


type AgentClient struct {
LivePeriod time.Duration
FirstConfigPerid time.Duration
SyncConfigPeriod time.Duration


Name string
EnvUUID string
IP string
Helper *EtcdHelper

StopCha chan struct{}
}

func NewAgentClient(name string, env_uuid string, host_ip string) *AgentClient {
return &AgentClient{
LivePeriod: 2,
FirstConfigPerid: 3,
SyncConfigPeriod: 60,


Name: name,
EnvUUID: env_uuid,
IP: host_ip,
Helper: NewEtcdHelper(),
StopCha: make(chan struct{}, 1),
}
}

func (ac *AgentClient) Init(display bool) {
//我還活著,不要幹掉我.
go func() {
ac.OnLine()

for {
select {
case return
case ac.OnLine()
}
}
}()

//if display {
// go func() {
// watch_chan := cc.Helper.Listen("/ingress/agent")
// for wresp := range watch_chan {

// for _, ev := range wresp.Events {
// fmt.Printf("watch %s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
// }
// }
// }()
//}

//重啟之後,第一次同步 和 定期同步.
//go func() {
//
// time.Sleep(time.Second * ac.FirstConfigPerid)
// ac.SyncIngressConfigs()
//
// for {
// select {
// case // return
// case // ac.SyncIngressConfigs()
// }
// }
//}()

if display {
//監聽controller變化(等待處理掉線自動重連後,重監聽)
go func() {
watch_chan := ac.Helper.Listen("/ingress/controller")
for wresp := range watch_chan {
for _, ev := range wresp.Events {
fmt.Printf("watch %s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
}
}
}()
}

//監聽本環境下ingress_config的變化(等待處理掉線自動重連, 重監聽)
go func() {
key := fmt.Sprintf("/ingress/ingress_config/%s", ac.EnvUUID)
watch_chan := ac.Helper.Listen(key)
for wresp := range watch_chan {
for _, ev := range wresp.Events {
fmt.Printf("watch %s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)

switch ev.Type.String() {
case "PUT":
fmt.Printf("agent=%s SetIngressConfig(%s, %s)\n", ac.Name, ev.Kv.Key, ev.Kv.Value)
//TODO: SetIngressConfig(key, value)

break
case "DELETE":
fmt.Printf("agent=%s DelIngressConfig(%s)\n", ac.Name, ev.Kv.Key)
//TODO: DelIngressConfig(key)
break
}
}
}
}()
}

func (ac *AgentClient) OnLine() {
key := fmt.Sprintf("/ingress/agent/%s/%s", ac.EnvUUID, ac.Name)
err := ac.Helper.PutValue(key, fmt.Sprintf(`{"name":"%s", "env_uuid":"%s", "ip":"%s"}`, ac.Name, ac.EnvUUID, ac.IP), 5)
if err != nil {
fmt.Printf(err.Error())
}
}

func (ac *AgentClient) OffLine() {
//ac.StopCha close(ac.StopCha)

key := fmt.Sprintf("/ingress/agent/%s/%s", ac.EnvUUID, ac.Name)
err := ac.Helper.DelValue(key)
if err != nil {
fmt.Printf(err.Error())
}
}

func (ac *AgentClient) UpdateIngressStatus(uuid string) {
key := fmt.Sprintf("/ingress/ingress_config_status/%s/%s/%s", ac.EnvUUID, uuid, ac.Name)
err := ac.Helper.DelValue(key)
if err != nil {
fmt.Printf(err.Error())
}
}

//服務重啟之後,第一次先調用 並用 定時同步
func (ac *AgentClient) SyncIngressConfigs() {
key := fmt.Sprintf("/ingress/ingress_config/%s", ac.EnvUUID)
kv_slice := ac.Helper.GetValue(key)
if kv_slice != nil {
//TODO: ingressConfig.SyncIngressConfigs(kv_slice)
for _, kv := range kv_slice {
fmt.Printf("name=%s, key:%s-----value:%s\n", ac.Name, kv.Key, kv.Value)
}
}
}


////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
func main() {

controller1 := NewControllerClient("dev-7_001", "192.168.0.10")
controller1.Init(false)
controller2 := NewControllerClient("dev-8_002", "192.168.0.11")
controller2.Init(false)
controller3 := NewControllerClient("dev-8_003", "192.168.0.12")
controller3.Init(false)


agent1 := NewAgentClient("dev-1_001", "1", "192.168.0.1")
agent1.Init(false)

agent2 := NewAgentClient("dev-2_001", "1", "192.168.0.2")
agent2.Init(false)

agent3 := NewAgentClient("dev-3_001", "1", "192.168.0.3")
agent3.Init(false)

agent4 := NewAgentClient("dev-4_001", "1", "192.168.0.4")
agent4.Init(false)

agent5 := NewAgentClient("dev-5_001", "1", "192.168.0.5")
agent5.Init(false)

agent6 := NewAgentClient("dev-6_001", "1", "192.168.0.6")
agent6.Init(false)

agent7 := NewAgentClient("dev-7_001", "1", "192.168.0.7")
agent7.Init(false)

agent8 := NewAgentClient("dev-8_001", "1", "192.168.0.8")
agent8.Init(false)

agent9 := NewAgentClient("dev-9_001", "1", "192.168.0.9")
agent9.Init(false)

agent10 := NewAgentClient("dev-10_001", "1", "192.168.0.10")
agent10.Init(false)


time.Sleep(time.Second*1)
controller3.SetIngressConfig("1", "0001", `{"config":"helloworld"}`)
controller3.DelIngressConfig("1", "0001")

controller3.SetIngressConfig("1", "0002", `{"config":"helloworld"}`)
controller3.DelIngressConfig("1", "0002")

controller3.SetIngressConfig("1", "0003", `{"config":"helloworld"}`)
controller3.DelIngressConfig("1", "0003")

controller3.SetIngressConfig("1", "0004", `{"config":"helloworld"}`)
controller3.DelIngressConfig("1", "0004")

controller3.SetIngressConfig("1", "0005", `{"config":"helloworld"}`)
controller3.DelIngressConfig("1", "0005")




forever := make(chan struct{})
}

總結

本文沒有完成自動重連功能,代碼僅作演示,後續會完善這部分功能。etcd 作為k8s的核心主件, 可以用這樣的方式加入生產代碼中。在agent側, agent實例很多的情況下,watch這樣的效率會很高。



分享到:


相關文章: