etcd API使用

连接客户端

访问etcd首先要创建client,它需要传入一个Config配置.

Endpoints:etcd的多个节点服务地址。 DialTimeout:创建client的首次连接超时时间,这里传了5秒,如果5秒都没有连接成功就会返回err;一旦client创建成功,不用再关心后续底层连接的状态了,client内部会重连。

 cli,err := clientv3.New(clientv3.Config{
Endpoints:[]string{"localhost:2379"},
DialTimeout: 5 * time.Second,
})

返回的 client,它的类型具体如下:

type Client struct {
Cluster
KV
Lease
Watcher
Auth
Maintenance
Username string
Password string
}

类型中的成员是etcd客户端几何核心功能模块的具体实现:

Cluster:向集群里增加删除etcd节点,集群管理

KV:K-V键值库

Lease:租约相关操作,租约过期会删除授权的key

Watcher:观察订阅,从而监听最新的数据变化

Auth:管理etcd的用户和权限,属于管理员操作

Maintenance:维护etcd,比如主动迁移etcd的leader节点

KV增删改查

kv对象

type KV interface {
// Put puts a key-value pair into etcd.
Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error)
// Get retrieves keys.
Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error)
// Delete deletes a key, or optionally using WithRange(end), [key, end).
Delete(ctx context.Context, key string, opts ...OpOption) (*DeleteResponse, error)
// Compact compacts etcd KV history before the given rev.
Compact(ctx context.Context, rev int64, opts ...CompactOption) (*CompactResponse, error)
// Do applies a single Op on KV without a transaction.
Do(ctx context.Context, op Op) (OpResponse, error)
// Txn creates a transaction.
Txn(ctx context.Context) Txn
}

KV存储读取删除

  1. kv对象实例获取 kv := clientev3.NewKV(client)
  2. 存储Put
putResp, err := kv.Put(context.TODO(),"/key/example", "hello")
  1. Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error)

参数介绍:

ctx: Context包对象,是用来跟踪上下文的,列如超时控制

key: 存储对象的key

val: 存储对象的value

opts: 可变参数,额外选项

  1. 查询Get getResp, err := kv.Get(context.TODO(), "/key/example")
  2. 删除Delete delREsp, err := kv.Delete(context.TODO(), "/key/example")
  3. WithPrefix
rangeResp, err := kv.Get(context.TODO(), "/key/example", clientv3.WithPrefix())
  1. WithPrefix()是指查找以/key/example为前缀的所有key

租约lease

租约对象

type Lease interface {
// Grant creates a new lease. 分配租约
Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error)
// Revoke revokes the given lease. 撤销租约
Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error)
// TimeToLive retrieves the lease information of the given lease ID. 获取ttl剩余时间
TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error)
// Leases retrieves all leases. 列举所有租约
Leases(ctx context.Context) (*LeaseLeasesResponse, error)
// KeepAlive keeps the given lease alive forever
// 租约自动定时续期
KeepAlive(ctx context.Context, id LeaseID) ( // KeepAliveOnce renews the lease once. In most of the cases, KeepAlive
// should be used instead of KeepAliveOnce.
//续期一次租约
KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error)
// Close releases all resources Lease keeps for efficient communication
// with the etcd server.
Close() error
}

租约使用

  1. 获取租约
  2. 设置租约ttl
  3. 对keyPut时使用租约,租约到期key自动删除
  4. 主动给Lease进行续约
lease := clientv3.NewLease(client)
grantResp, err := lease.Grant(context.TODO(), 10)
kv.Put(context.TODO(), "/example/expireme", "lease-go", clientv3.WithLease(grantResp.ID))
keepResp, err := lease.KeepAlive(context.TODO(), grantResp.ID)
// sleep一会...

事务Tnx

etcd中事务是原子执行的,针对kv存储操作的if … then … else结构.将请求归并到一起放在原子块中.事务可以保护key不受其他并发更新操作的影响.

etcd的实现方式有些不同,它的事务是基于cas方式实现的。在事务执行过程中,client和etcd之间没有维护事务会话,在commit事务时,它的“冲突判断(If)和执行过程Then/Else”一次性提交给etcd,etcd来作为一个原子过程来执行“If-Then-Else”。所以,etcd事务不会发生阻塞,无论成功,还是失败,都会立即返回,需要应用进行失败(发生冲突)重试。因此,这也就要求业务代码是可重试的。

type Txn interface {
// If takes a list of comparison. If all comparisons passed in succeed,

// the operations passed into Then() will be executed. Or the operations
// passed into Else() will be executed.
If(cs ...Cmp) Txn
// Then takes a list of operations. The Ops list will be executed, if the
// comparisons passed in If() succeed.
Then(ops ...Op) Txn
// Else takes a list of operations. The Ops list will be executed, if the
// comparisons passed in If() fail.
Else(ops ...Op) Txn
// Commit tries to commit the transaction.
Commit() (*TxnResponse, error)
}
  1. 通过kv对象开启一个事务
  2. if then else 书写事务
  3. 最后Commit提交整个Txn事务
txn := kv.Txn(context.TODO())
txnResp, err := txn.If(clientv3.Compare(clientv3.Value("/hi"), "=","hello")).
Then(clientv3.OpGet("/hi")).
Else(clientv3.OpGet("/test/",clientv3.WithPrefix())).
Commit()
if txnResp.Succeeded { // If = true
fmt.Println("~~~", txnResp.Responses[0].GetResponseRange().Kvs)
} else { // If =false
fmt.Println("!!!", txnResp.Responses[0].GetResponseRange().Kvs)
}

这个Value(“/hi”)返回的Cmp表达了:“/hi这个key对应的value”

利用Compare函数来继续为”主语”增加描述, 形成完整的条件语句:/hi这个key对应的value必须等于"hello"

监听watch

watch接口

type Watcher interface { 

// Watch watches on a key or prefix. The watched events will be returned
// through the returned channel.
// If the watch is slow or the required rev is compacted, the watch request
// might be canceled from the server-side and the chan will be closed.
// 'opts' can be: 'WithRev' and/or 'WithPrefix'.
Watch(ctx context.Context, key string, opts ...OpOption) WatchChan
// Close closes the watcher and cancels all watch requests.
Close() error
}

Watch 方法返回一个WatchChan 类似的变量,该通道传递WatchResponse类型

//创建监听
wc := client.Watch(context.TODO(), "/job/v3/1", clientv3.WithPrevKV())
//range 监听事件
for v := range wc {
for _, e := range v.Events {
log.Printf("type:%v kv:%v val:%v prevKey:%v \\n ", e.Type, string(e.Kv.Key),e.Kv.Value, e.PrevKv)
}
}
etcd API使用


分享到:


相關文章: