分布式组件etcd应用

技术漫谈 | 分布式组件etcd应用

一、搭建etcd环境

我将使用 etcd v3版本, so,本地先建个单机版本ETCD环境。

docker pull xieyanze/etcd3:latest
docker run —name etcd-v3.0.9 -d -v /tmp:/data \
-p 2379:2379 -p 2380:2380 -p 4001:4001 -p 7001:7001 xieyanze/etcd3:latest
docker exec -it etcd-v3.0.9 sh
当前默认还是v2版本通过设定环境变量export ETCDCTL_API=3,设置成V3版本。
export ETCDCTL_API=3
etcdctl put /test/ok 11
etcdctl put /test/ok 22
etcdctl del /test/gg
#删除所有/test前缀的节点
etcdctl del /test --prefix
etcdctl get /test/ok
# 前缀查询
etcdctl get /test/ok --prefix

二、软件逻辑结构

技术漫谈 | 分布式组件etcd应用

  1. k8s master cluster

    dev-7

    dev-8

  2. k8s slave cluster 1 env1

    dev-1

    dev-2

    dev-3

  3. k8s slave cluster 2 env2

    dev-4

    dev-5

    dev-6

三、controller 与 agent 服务注册与发现

实现原理:

注意: etcd v3版本,k/v 的超时间时TTL最小5秒种。

  • 每2秒钟,每个服务向etcd发送一次心跳包,证明自己还活着;

  • 当服务退出时,主动删除etcd的key或者等到TTL超时之后,自动下线;

  • controller需要获得agent的状态,直接GET [ingress/agent/${env_uuid}/]就能获得当前agent在线状态;

  • agent需要获得controller的状态,直接GET [ingress/controller]就能获得当前controller在线状态。

技术漫谈 | 分布式组件etcd应用

技术漫谈 | 分布式组件etcd应用

四、软件业务的实现

1. controller side:

  • 客户端调用controller restful api.controller 直接写入ETCD,同时写入副本到mysql;

  • controller 如果关注于agent的变化,只需要watch ingress/agent这个目录;

  • controller 是无状态,不需要同步多个实例之间的数据,可以任意的scale它的实例数;

  • 如果controller挂掉之后,重启加载mysql的数据库同步到etcd中。

2. controller需要了解规则执行状态

技术漫谈 | 分布式组件etcd应用

技术漫谈 | 分布式组件etcd应用

agent的执行状态直接写入配置状态中,

先获得当前ingress/agent/env1目录下的agent列表,对比ingress/ingress_agent/env1/${config_uuid1}/status目录下的规则完成之后反馈列表,每一个都存在时,则全部执行成功。

3. agent side:

  • 不同集群agent 通过etcd的watch功能在第一时间,获得监听到所有数据的变化 :新建、删除、更新;

  • 不同集群agent 定时3分钟获得自已环境下的列表信息,同步处理相关信息;

  • 如果agent挂了之后,,重启加载一次etcd中所有的ingress_conifg。

五、代码实例

1. etcd clientv3 的封装

  • 连接管理支,持TLS;

  • 增、删、查,支持自动超时的设值;

  • watch 监听目录或KEY的值的变化(PUT,DELETE)。


package main

import (
"fmt"
"time"
// "github.com/coreos/etcd/pkg/transport"
"github.com/coreos/etcd/clientv3"
"golang.org/x/net/context"
// "sync"

)


type EtcdData struct {
Key string
Value string
}

type EtcdHelper struct {
RequestTimeout time.Duration
Client *clientv3.Client
}

func NewEtcdHelper() *EtcdHelper {

//tlsInfo := transport.TLSInfo{
// CertFile: "/tmp/test-certs/test-name-1.pem",
// KeyFile: "/tmp/test-certs/test-name-1-key.pem",
// TrustedCAFile: "/tmp/test-certs/trusted-ca.pem",
//}
//tlsInfo := transport.TLSInfo{
// CertFile: "./tls/apiserver.crt",
// KeyFile: "./tls/apiserver.key",
//}
//tlsConfig, err := tlsInfo.ClientConfig()
//if err != nil {
// fmt.Printf("%s", err.Error())
// return nil
//}

//cli, err := clientv3.New(clientv3.Config{
// Endpoints: []string{"dev-7:2379"},
// DialTimeout: 3 * time.Second,
// TLS: tlsConfig,
//})

cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
//Endpoints: []string{"http://dev-7:2379"},
DialTimeout: 3 * time.Second,
})

if err != nil {
fmt.Printf("%s", err.Error())
return nil
}

return &EtcdHelper{
RequestTimeout: 5 *time.Second,
Client: cli,

}
}

func (c *EtcdHelper) Release() {
if c.Client != nil {
c.Client.Close()
}
}

func (c *EtcdHelper) PutValue(key string, value string, ttl int64) error {
ctx, cancel := context.WithTimeout(context.Background(), c.RequestTimeout)
defer cancel()

// minimum lease TTL is 5-second
resp, err := c.Client.Grant(context.TODO(), ttl)
if err != nil {
fmt.Printf("%s\n", err.Error())
return err
}

_, err = c.Client.Put(ctx, key, value, clientv3.WithLease(resp.ID))
if err != nil {
fmt.Printf("%s\n", err.Error())
return err
}

return nil
}

func (c *EtcdHelper) SetValue(key string, value string) error {
ctx, cancel := context.WithTimeout(context.Background(), c.RequestTimeout)
defer cancel()

_, err := c.Client.Put(ctx, key, value)
if err != nil {
fmt.Printf("%s\n", err.Error())
return err
}

return nil
}

func (c *EtcdHelper) GetValue(key string) []EtcdData {
ctx, cancel := context.WithTimeout(context.Background(), c.RequestTimeout)
defer cancel()
resp, err := c.Client.Get(ctx, key, clientv3.WithPrefix())
if err != nil {
fmt.Printf("%s\n", err.Error())
return nil
}


var kv_slice []EtcdData
for _, ev := range resp.Kvs {
//fmt.Printf("%s : %s\n", ev.Key, ev.Value)

kv := EtcdData{string(ev.Key), string(ev.Value)}
kv_slice = append(kv_slice, kv)
}

return kv_slice
}

func (c *EtcdHelper) DelValue(key string) error {
ctx, cancel := context.WithTimeout(context.Background(), c.RequestTimeout)
defer cancel()
_, err := c.Client.Delete(ctx, key, clientv3.WithPrefix())
if err != nil {
fmt.Printf("%s\n", err.Error())
return err
}

return nil
}

func (c *EtcdHelper) Watch(key string) {
rch := c.Client.Watch(context.Background(), key, clientv3.WithPrefix())
for wresp := range rch {
for _, ev := range wresp.Events {
fmt.Printf("watch %s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
}
}
}

func (c *EtcdHelper) Listen(key string) clientv3.WatchChan {
return c.Client.Watch(context.Background(), key, clientv3.WithPrefix())
}

2. controller 的代码实现

  • controller上线,下线功能;

  • controller定时发送心跳包到etcd;

  • controller监听agent的变化.(1-3)完成服务注册与发现;

  • controller通过下发配置到etcd,通知所有watch ingress_config变化的agent。


////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
type ControllerClient struct {
Period time.Duration
Name string
IP string
Helper *EtcdHelper

StopCha chan int

//Lock *sync.Mutex
}

func NewControllerClient(name string, host_ip string) *ControllerClient {
return &ControllerClient{
Period: 2,
Name: name,
IP: host_ip,
Helper: NewEtcdHelper(),
StopCha: make(chan int, 10),
//Lock: new(sync.Mutex),
}
}

func (cc *ControllerClient) Init(display bool) {
go func() {
cc.OnLine()

for {
select {
case fmt.Printf("online goroutinue is exited.")
return
case cc.OnLine()
}
}
}()



if display {
go func() {
watch_chan := cc.Helper.Listen("/ingress/agent")
for wresp := range watch_chan {
for _, ev := range wresp.Events {
fmt.Printf("watch %s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
}
}
}()
}
}

func (cc *ControllerClient) OnLine() {
key := fmt.Sprintf("/ingress/controller/%s", cc.Name)

//cc.Lock.Lock()
//defer cc.Lock.Unlock()
err := cc.Helper.PutValue(key, "1", 5)

if err != nil {
fmt.Printf(err.Error())
}
}

func (cc *ControllerClient) OffLine() {
close(cc.StopCha)

key := fmt.Sprintf("/ingress/controller/%s", cc.Name)

//cc.Lock.Lock()
//defer cc.Lock.Unlock()
err := cc.Helper.DelValue(key)

if err != nil {
fmt.Printf(err.Error())
}
}

func (cc *ControllerClient) GetIngressConfig(env_uuid string, uuid string) []EtcdData {
//TODO. first save to mysql.
key := fmt.Sprintf("/ingress/ingress_config/%s/%s", env_uuid, uuid)
//cc.Lock.Lock()
//defer cc.Lock.Unlock()
return cc.Helper.GetValue(key)


}

func (cc *ControllerClient) SetIngressConfig(env_uuid string, uuid string, config string) {
//TODO. first save to mysql.

key := fmt.Sprintf("/ingress/ingress_config/%s/%s", env_uuid, uuid)
//cc.Lock.Lock()
//defer cc.Lock.Unlock()
err := cc.Helper.SetValue(key, config)

if err != nil {
fmt.Printf(err.Error())
}
}

func (cc *ControllerClient) DelIngressConfig(env_uuid string, uuid string) {
//TODO. first update to mysql.

key := fmt.Sprintf("/ingress/ingress_config/%s/%s", env_uuid, uuid)

//cc.Lock.Lock()
//defer cc.Lock.Unlock()
err := cc.Helper.DelValue(key)

if err != nil {
fmt.Printf(err.Error())
}
}

3. agent代码实现

  • agent上线,下线功能;

  • agent定时发送心跳包到etcd;

  • agent监听(watch) controller的变化,(1-3)完成服务注册与发现;

  • agnet监听(watch) ingress_config变化的agent,实时完成更新或设置配置,删除配置功能。


type AgentClient struct {
LivePeriod time.Duration
FirstConfigPerid time.Duration
SyncConfigPeriod time.Duration


Name string
EnvUUID string
IP string
Helper *EtcdHelper

StopCha chan struct{}
}

func NewAgentClient(name string, env_uuid string, host_ip string) *AgentClient {
return &AgentClient{
LivePeriod: 2,
FirstConfigPerid: 3,
SyncConfigPeriod: 60,


Name: name,
EnvUUID: env_uuid,
IP: host_ip,
Helper: NewEtcdHelper(),
StopCha: make(chan struct{}, 1),
}
}

func (ac *AgentClient) Init(display bool) {
//我还活着,不要干掉我.
go func() {
ac.OnLine()

for {
select {
case return
case ac.OnLine()
}
}
}()

//if display {
// go func() {
// watch_chan := cc.Helper.Listen("/ingress/agent")
// for wresp := range watch_chan {

// for _, ev := range wresp.Events {
// fmt.Printf("watch %s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
// }
// }
// }()
//}

//重启之后,第一次同步 和 定期同步.
//go func() {
//
// time.Sleep(time.Second * ac.FirstConfigPerid)
// ac.SyncIngressConfigs()
//
// for {
// select {
// case // return
// case // ac.SyncIngressConfigs()
// }
// }
//}()

if display {
//监听controller变化(等待处理掉线自动重连后,重监听)
go func() {
watch_chan := ac.Helper.Listen("/ingress/controller")
for wresp := range watch_chan {
for _, ev := range wresp.Events {
fmt.Printf("watch %s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
}
}
}()
}

//监听本环境下ingress_config的变化(等待处理掉线自动重连, 重监听)
go func() {
key := fmt.Sprintf("/ingress/ingress_config/%s", ac.EnvUUID)
watch_chan := ac.Helper.Listen(key)
for wresp := range watch_chan {
for _, ev := range wresp.Events {
fmt.Printf("watch %s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)

switch ev.Type.String() {
case "PUT":
fmt.Printf("agent=%s SetIngressConfig(%s, %s)\n", ac.Name, ev.Kv.Key, ev.Kv.Value)
//TODO: SetIngressConfig(key, value)

break
case "DELETE":
fmt.Printf("agent=%s DelIngressConfig(%s)\n", ac.Name, ev.Kv.Key)
//TODO: DelIngressConfig(key)
break
}
}
}
}()
}

func (ac *AgentClient) OnLine() {
key := fmt.Sprintf("/ingress/agent/%s/%s", ac.EnvUUID, ac.Name)
err := ac.Helper.PutValue(key, fmt.Sprintf(`{"name":"%s", "env_uuid":"%s", "ip":"%s"}`, ac.Name, ac.EnvUUID, ac.IP), 5)
if err != nil {
fmt.Printf(err.Error())
}
}

func (ac *AgentClient) OffLine() {
//ac.StopCha close(ac.StopCha)

key := fmt.Sprintf("/ingress/agent/%s/%s", ac.EnvUUID, ac.Name)
err := ac.Helper.DelValue(key)
if err != nil {
fmt.Printf(err.Error())
}
}

func (ac *AgentClient) UpdateIngressStatus(uuid string) {
key := fmt.Sprintf("/ingress/ingress_config_status/%s/%s/%s", ac.EnvUUID, uuid, ac.Name)
err := ac.Helper.DelValue(key)
if err != nil {
fmt.Printf(err.Error())
}
}

//服务重启之后,第一次先调用 并用 定时同步
func (ac *AgentClient) SyncIngressConfigs() {
key := fmt.Sprintf("/ingress/ingress_config/%s", ac.EnvUUID)
kv_slice := ac.Helper.GetValue(key)
if kv_slice != nil {
//TODO: ingressConfig.SyncIngressConfigs(kv_slice)
for _, kv := range kv_slice {
fmt.Printf("name=%s, key:%s-----value:%s\n", ac.Name, kv.Key, kv.Value)
}
}
}


////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
func main() {

controller1 := NewControllerClient("dev-7_001", "192.168.0.10")
controller1.Init(false)
controller2 := NewControllerClient("dev-8_002", "192.168.0.11")
controller2.Init(false)
controller3 := NewControllerClient("dev-8_003", "192.168.0.12")
controller3.Init(false)


agent1 := NewAgentClient("dev-1_001", "1", "192.168.0.1")
agent1.Init(false)

agent2 := NewAgentClient("dev-2_001", "1", "192.168.0.2")
agent2.Init(false)

agent3 := NewAgentClient("dev-3_001", "1", "192.168.0.3")
agent3.Init(false)

agent4 := NewAgentClient("dev-4_001", "1", "192.168.0.4")
agent4.Init(false)

agent5 := NewAgentClient("dev-5_001", "1", "192.168.0.5")
agent5.Init(false)

agent6 := NewAgentClient("dev-6_001", "1", "192.168.0.6")
agent6.Init(false)

agent7 := NewAgentClient("dev-7_001", "1", "192.168.0.7")
agent7.Init(false)

agent8 := NewAgentClient("dev-8_001", "1", "192.168.0.8")
agent8.Init(false)

agent9 := NewAgentClient("dev-9_001", "1", "192.168.0.9")
agent9.Init(false)

agent10 := NewAgentClient("dev-10_001", "1", "192.168.0.10")
agent10.Init(false)


time.Sleep(time.Second*1)
controller3.SetIngressConfig("1", "0001", `{"config":"helloworld"}`)
controller3.DelIngressConfig("1", "0001")

controller3.SetIngressConfig("1", "0002", `{"config":"helloworld"}`)
controller3.DelIngressConfig("1", "0002")

controller3.SetIngressConfig("1", "0003", `{"config":"helloworld"}`)
controller3.DelIngressConfig("1", "0003")

controller3.SetIngressConfig("1", "0004", `{"config":"helloworld"}`)
controller3.DelIngressConfig("1", "0004")

controller3.SetIngressConfig("1", "0005", `{"config":"helloworld"}`)
controller3.DelIngressConfig("1", "0005")




forever := make(chan struct{})
}

总结

本文没有完成自动重连功能,代码仅作演示,后续会完善这部分功能。etcd 作为k8s的核心主件, 可以用这样的方式加入生产代码中。在agent侧, agent实例很多的情况下,watch这样的效率会很高。



分享到:


相關文章: