etcd基础与使用

etcd基础与使用

1 etcd简介

etcd是一个高可用的分布式的键值对存储系统,常用做配置共享和服务发现,由CoreOS公司发起的一个开源项目,受到ZooKeeper与doozer启发而催生的项目,名称”etcd”源自两个想法,即Unix的”/etc”文件夹和”d”分布式系统。”/etc”文件夹是用于存储单个系统的配置数据的地方,而etcd用于存储大规模分布式的配置信息,etcd有如下特点:

  • 简单:基于HTTP+JSON的API,用curl就可以轻松使用。
  • 可信:使用Raft算法充分实现了分布式。
  • 安全:可选SSL客户认证机制。
  • 快速:每个节点可支持上万QPS读写。

etcd有V2和V3两个版本,V3版本供了更多功能并提高了性能,应用程序使用新的grpc API访问mvcc存储,mvcc存储区和旧存储区v2是分开且隔离的,写入存储v2不会影响mvcc存储,写入mvcc存储也不会影响存储v2。

API v2和API v3之间存在一些显着差异:

  • 事务:在v3中,etcd提供了多键条件事务。应用程序应使用事务代替Compare-And-Swap操作。
  • 平面键空间:API v3中没有目录,只有键。例如,”/a/b/c/“是键。范围查询支持获取与给定前缀匹配的所有键。
  • 紧凑的响应:delete操作不再返回以前的值。为了获得删除的值,可以使用事务原子地获取密钥,然后删除其值。
  • 租约:替代v2 TTL;TTL绑定到租约,密钥附加到租约。TTL过期后,租约将被撤销,所有附加密钥也将被删除。



2 etcd工作原理

etcd集群本身是一个分布式系统,由多个节点相互通信构成整体对外服务,每个节点都存储了完整的数据,并且通过Raft协议保证每个节点维护的数据是一致的,在ETCD集群中任意时刻至多存在一个有效的主节点,由主节点处理所有来自客户端写操作,通过Raft协议保证写操作对状态机的改动会可靠的同步到其他节点,Raft协议如下图所示:

raft协议

Raft协议主要分为三个部分:选举,复制日志,安全性。


2.1 选举

Raft协议是用于维护一组服务节点数据一致性的协议。这一组服务节点构成一个集群,并且有一个主节点来对外提供服务。当集群初始化,或者主节点挂掉后,面临一个选举问题。集群中每个节点,任意时刻处于Leader、Follower、Candidate这三个角色之一,选举特点如下:

  • 当集群初始化时候,每个节点都是Follower角色。
  • 集群中存在至多1个有效的主节点,通过心跳与其他节点同步数据;
  • 当Follower在一定时间内没有收到来自主节点的心跳,会将自己角色改变为Candidate,并发起一次选举投票。
    • 当收到包括自己在内超过半数节点赞成后,选举成功。
    • 当收到票数不足半数选举失败,或者选举超时。
    • 若本轮未选出主节点,将进行下一轮选举(出现这种情况,是由于多个节点同时选举,所有节点均为获得过半选票)。
  • Candidate节点收到来自主节点的信息后,会立即终止选举过程,进入Follower角色。

为了避免陷入选举失败循环,每个节点未收到心跳发起选举的时间是一定范围内的随机值,这样能够避免2个节点同时发起选举。


2.2 复制日志

日志复制是指主节点将每次操作形成日志条目,并持久化到本地磁盘,然后通过网络IO发送给其他节点。其他节点根据日志的逻辑时钟(TERM)和日志编号(INDEX)来判断是否将该日志记录持久化到本地。当主节点收到包括自己在内超过半数节点成功返回,那么认为该日志是可提交的(committed),并将日志输入到状态机,将结果返回给客户端。

这里需要注意的是,每次选举都会形成一个唯一的TERM编号,相当于逻辑时钟,每一条日志都有全局唯一的编号。

主节点通过网络IO向其他节点追加日志。若某节点收到日志追加的消息,首先判断该日志的TERM是否过期,以及该日志条目的INDEX是否比当前以及提交的日志的INDEX跟早。若已过期,或者比提交的日志更早,那么就拒绝追加,并返回该节点当前的已提交的日志的编号。否则将日志追加,并返回成功。

当主节点收到其他节点关于日志追加的回复后,若发现有拒绝,则根据该节点返回的已提交日志编号,发生其编号下一条日志。

主节点向其他节点同步日志,还作了拥塞控制。主节点发现日志复制的目标节点拒绝了某次日志追加消息,将进入日志探测阶段,一条一条发送日志,直到目标节点接受日志,然后进入快速复制阶段,可进行批量日志追加。

按照日志复制的逻辑,我们可以看到,集群中慢节点不影响整个集群的性能。另外一个特点是,数据只从主节点复制到Follower节点,这样大大简化了逻辑流程。Raft日志复制路程如下图所示:

raft复制日志


2.3 安全

选举复制日志并不能保证节点间数据一致。当一个某个节点挂掉了,一段时间后再次重启,并刚好当选为主节点。而在其挂掉这段时间内,集群若有超过半数节点存活,集群会正常工作,那么会有日志提交,这些提交的日志无法传递给挂掉的节点。当挂掉的节点再次当选举节点,它将缺失部分已提交的日志。在这样场景下,按Raft协议,它将自己日志复制给其他节点,会将集群已经提交的日志给覆盖掉,这显然是不可接受的,对于出现这种问题解决办法:

  • 其他协议解决这个问题的办法是,新当选的主节点会询问其他节点,和自己数据对比,确定出集群已提交数据,然后将缺失的数据同步过来。这个方案有明显缺陷,增加了集群恢复服务的时间(集群在选举阶段不可服务),并且增加了协议的复杂度。

  • Raft解决的办法是,在选举逻辑中,对能够成为主节点加以限制,确保选出的节点已定包含了集群已经提交的所有日志。如果新选出的主节点已经包含了集群所有提交的日志,那就不需要从和其他节点比对数据了,简化了流程,缩短了集群恢复服务的时间。

为什么只要仍然有超过半数节点存活,一定能够选出包含所有日志数据的节点作为主节点呢?因为已经提交的日志必然被集群中超过半数节点持久化,显然前一个主节点提交的最后一条日志也被集群中大部分节点持久化。当主节点挂掉后,集群中仍有大部分节点存活,那这存活的节点中一定存在一个节点包含了已经提交的日志了,因此要求etcd集群节点数量为奇数(3,5,7,9……)。



3 ETCD应用场景

3.1 服务发现

ETCD服务发现示意图如下图所示:

服务发现


服务发现是分布式系统中最常见的需要解决的问题之一,即在同一个分布式集群中的进程或服务,客户端通过名字就可以查找和连接服务端。要解决服务发现的问题,需要有下面三点:

  • 一个强一致性、高可用的服务存储目录。基于Raft算法的etcd天生就是这样一个强一致性高可用的服务存储目录。
  • 一种注册服务和监控服务健康状态的机制。用户可以在etcd中注册服务,并且对注册的服务设置key TTL,定时保持服务的心跳以达到监控健康状态的效果。
  • 一种查找和连接服务的机制。通过在etcd指定的主题快速找到服务地址。


(1) 在微服务中使用etcd服务发现

随着Docker容器的流行,多种微服务共同协作,构成一个相对功能强大的组织架构。使用etcd服务发现机制,在etcd中注册某个服务名字的目录,在该目录下存储可用的服务节点的IP。服务使用者从etcd目录下查找可用的服务节点IP来连接和调用,达到透明化的动态添加这些服务目的,示意图如下图所示:

服务发现


(2) 在PaaS平台中使用etcd服务发现

PaaS平台中的应用一般都有多个实例,通过域名不仅可以透明的对多个实例进行访问,而且还可以做到负载均衡。但是应用的某个实例随时都有可能故障重启,这时就需要动态的配置域名解析(路由)信息,通过etcd的服务发现功能就可以轻松解决这个动态配置的问题,实现多实例与实例故障重启透明化目的,示意图如下图所示:

服务发现


3.2 发布订阅消息

etcd的发布订阅消息示意图如下图所示:

edct发布订阅

在分布式系统中,消息发布与订阅最适合使用用在组件之间通信。使用etcd发布订阅功能可以实现一个配置共享中心,数据提供者在配置中心发布消息,消息消费者订阅他们关心的主题,一旦主题有新消息发布,就会实时通知订阅者,通过这种方式可以做到分布式系统配置的集中式管理与动态更新。

etcd发布订阅最典型应用在kubernetes上,其他场景应用:

  • app或服务用到的一些配置信息放到etcd上进行集中管理。在启动的时候主动从etcd获取一次配置信息,在etcd节点上注册一个Watcher并等待,以后每次配置有更新的时候,etcd都会实时通知订阅者,以此达到获取最新配置信息的目的。
  • 分布式搜索服务中,索引的元信息和服务器集群机器的节点状态存放在etcd中,供各个客户端订阅使用。使用etcd的key TTL功能可以确保机器状态是实时更新的。
  • 分布式日志收集系统。 这个系统的核心工作是收集分布在不同机器的日志。收集器通常是按照应用(或主题)来分配收集任务单元,因此可以在etcd上创建一个以应用(主题)命名的目录,并将这个应用(主题相关)的所有机器ip,以子目录的形式存储到目录上,然后设置一个etcd递归的Watcher,递归式的监控应用(主题)目录下所有信息的变动。这样就实现了机器IP(消息)变动的时候,能够实时通知到收集器调整任务分配。
  • 系统中信息需要动态自动获取与人工干预修改信息请求内容的情况。只需要要将信息存放到指定的etcd目录中,etcd的这些目录就可以通过HTTP的接口在外部访问。


3.3 负载均衡

etcd的负载均衡示意图如下图所示:

etcd负载均衡

etcd本身分布式架构存储的信息访问支持负载均衡,etcd集群化以后,每个etcd的核心节点都可以处理用户的请求。所以把数据量小但是访问频繁的消息数据直接存储到etcd中也是个不错的选择。 etcd可以监控一个集群中多个节点的状态,利用etcd维护一个负载均衡节点表,当有一个请求发过来后,可以轮询式的把请求转发给存活着的节点。

分布式系统中,为了保证服务的高可用以及数据的一致性,通常都会把数据和服务部署多份,以此达到对等服务,即使其中的某一个服务失效了,也不影响使用。由此带来的坏处是数据写入性能下降,而好处则是数据访问时的负载均衡。因为每个对等服务节点上都存有完整的数据,所以用户的访问流量就可以分流到不同的机器上。


3.4 分布式通知与协调

这里说到的分布式通知与协调,与消息发布和订阅有些相似。都用到了etcd中Watche机制,通过注册与异步通知机制,实现分布式环境下不同系统之间 的通知与协调,从而对数据变更做到实时处理。实现方式:不同系统都在etcd上对同一个目录进行注册,同时设置Watcher观测该目录的变化(如果对子目录的变化也有需要,可以设置递归模式),当某个系统更新了etcd的目录,那么设置了Watcher的系统就会收到通知,并作出相应处理。

使用etcd完成分布式协同工作原理如下图所示:

etcd通知与协调

  • 通过etcd进行低耦合的心跳检测。检测系统和被检测系统通过etcd上某个目录关联而非直接关联起来,这样可以大大减少系统的耦合性。
  • 通过etcd完成系统调度。某系统有控制台和推送系统两部分组成,控制台的职责是控制推送系统进行相应的推送工作。管理人员在控制台作的一些操作,实际上是修改了etcd上某些目录节点的状态,而etcd就把这些变化通知给注册了Watcher的推送系统客户端,推送系统再作出相应的推送任务。
  • 通过etcd完成工作汇报。大部分类似的任务分发系统,子任务启动后,到etcd来注册一个临时工作目录,并且定时将自己的进度进行汇报(将进度写入到这个临时目录),这样任务管理者就能够实时知道任务进度。


3.5 分布式锁

因为etcd使用Raft算法保持了数据的强一致性,某次操作存储到集群中的值必然是全局一致的,所以很容易实现分布式锁,锁有两种使用方式:

  • 保持独占,即所有获取锁的用户最终只有一个可以得到。etcd为此提供了一套实现分布式锁原子操作CAS(CompareAndSwap)的API。通过设置prevExist值,可以保证在多个节点同时去创建某个目录时只有一个成功,而创建成功的用户就可以认为是获得了锁。
  • 控制时序,即所有想要获得锁的用户都会被安排执行,但是获得锁的顺序也是全局唯一的,同时决定了执行顺序。etcd为此也提供了一套API(自动创建有序键),对一个目录建值时指定为POST动作,这样etcd会自动在目录下生成一个当前最大的值为键,存储这个新的值(客户端编号)。同时还可以使用API按顺序列出所有当前目录下的键值。此时这些键的值就是客户端的时序,而这些键中存储的值可以是代表客户端的编号。

从etcd获取的分布式锁如下图所示:

分布式锁


3.6 分布式队列

分布式队列的常规用法与场景五中所描述的分布式锁的控制时序用法类似,创建一个先进先出的队列,保证顺序。另一种比较有意思的实现是在保证队列达到某个条件时再统一按顺序执行。这种方法的实现可以在/queue这个目录中另外建立一个/queue/condition节点,condition可以表示信息如下:

  • condition可以表示队列大小。比如一个大的任务需要很多小任务就绪的情况下才能执行,每次有一个小任务就绪,就给这个condition数字加1,直到达到大任务规定的数字,再开始执行队列里的一系列小任务,最终执行大任务,如下图所示: 分布式队列

  • condition可以表示某个任务在不在队列。这个任务可以是所有排序任务的首个执行程序,也可以是拓扑结构中没有依赖的点。通常必须执行这些任务后才能执行队列中的其他任务。

  • condition还可以表示其它的一类开始执行任务的通知。可以由控制程序指定,当condition出现变化时,开始执行队列任务。


3.7 集群监控

使用etcd来实现集群的实时性的监控,可以第一时间检测到各节点的健康状态,以完成集群的监控要求。etcd本身就有自带检点健康监控功能,实现起来也比较简单。

  • 使用Watcher机制,当某个节点消失或有变动时,Watcher会第一时间发现并告知用户。
  • 节点可以设置TTL key,比如每隔30s发送一次心跳使代表该机器存活的节点继续存在,否则节点消失。


3.8 Leader竞选

使用分布式锁,可以完成Leader竞选。这种场景通常是一些长时间CPU计算或者使用IO操作的机器,只需要竞选出的Leader计算或处理一次,就可以把结果复制给其他的Follower,从而避免重复劳动,节省计算资源。

可使用在搜索系统中建立全量索引。如果每个机器都进行一遍索引的建立,不但耗时而且建立索引的一致性不能保证。通过在etcd的CAS机制同时创建一个节点,创建成功的机器作为Leader,进行索引计算,然后把计算结果分发到其它节点。



4 安装

4.1 在docker安装单机版

使用docker-compose.yml脚本如下:

version: "3"
  
services:
  etcd:
    image: quay.io/coreos/etcd
    container_name: etcd-single
    restart: always
    ports:
      - 2379:2379
      - 2380:2380
    volumes:
      - $PWD/etcd-data:/etcd-data
    environment:
      - ETCDCTL_API=3
    command:
      - /usr/local/bin/etcd
      - --data-dir=/etcd-data
      - --name=etcd-single
      - --listen-peer-urls=http://0.0.0.0:2380
      - --listen-client-urls=http://0.0.0.0:2379
      - --initial-advertise-peer-urls=http://0.0.0.0:2380
      - --advertise-client-urls=http://0.0.0.0:2379
      #- --initial-cluster=etcd-single=http://0.0.0.0:2380 # 不指定参数,让etcd自动生成
# 启动etcd服务
docker-compose up -d

# 把容器里的etcdctl客户端复制到本地使用
docker exec -t etcd-single cp /usr/local/bin/etcdctl /etcd-data
sudo mv /etcd-data/etcdctl /usr/local/bin/

# 让etcdctl使用v3版本,和服务端对应
echo 'export ETCDCTL_API=3' >> ~/.bashrc
source  ~/.bashrc

# 查看版本
etcdctl version

# 查看成员
etcdctl member list


4.2 在一台主机上安装docker集群版

version: '3'

services:
  etcd1:
    image: quay.io/coreos/etcd
    container_name: etcd1
    restart: always
    environment:
      - ETCDCTL_API=3
    command:
      - etcd
      - --name=etcd1
      - --data-dir=/etcd-data
      - --advertise-client-urls=http://0.0.0.0:2379
      - --listen-client-urls=http://0.0.0.0:2379
      - --listen-peer-urls=http://0.0.0.0:2380
      - --initial-cluster-token=etcd-cluster
      - --initial-cluster=etcd1=http://etcd1:2380,etcd2=http://etcd2:2380,etcd3=http://etcd3:2380
      - --initial-cluster-state=new
    ports:
      - 23791:2379
      - 23801:2380
    volumes:
      - $PWD/etcd1-data:/etcd-data
    networks:
      - etcd-net

  etcd2:
    image: quay.io/coreos/etcd
    container_name: etcd2
    restart: always
    environment:
      - ETCDCTL_API=3
    command:
      - etcd
      - --name=etcd2
      - --data-dir=/etcd-data
      - --advertise-client-urls=http://0.0.0.0:2379
      - --listen-client-urls=http://0.0.0.0:2379
      - --listen-peer-urls=http://0.0.0.0:2380
      - --initial-cluster-token=etcd-cluster
      - --initial-cluster=etcd1=http://etcd1:2380,etcd2=http://etcd2:2380,etcd3=http://etcd3:2380
      - --initial-cluster-state=new
    ports:
      - 23792:2379
      - 23802:2380
    volumes:
      - $PWD/etcd2-data:/etcd-data
    networks:
      - etcd-net

  etcd3:
    image: quay.io/coreos/etcd
    container_name: etcd3
    restart: always
    environment:
      - ETCDCTL_API=3
    command:
      - etcd
      - --name=etcd3
      - --data-dir=/etcd-data
      - --advertise-client-urls=http://0.0.0.0:2379
      - --listen-client-urls=http://0.0.0.0:2379
      - --listen-peer-urls=http://0.0.0.0:2380
      - --initial-cluster-token=etcd-cluster
      - --initial-cluster=etcd1=http://etcd1:2380,etcd2=http://etcd2:2380,etcd3=http://etcd3:2380
      - --initial-cluster-state=new
    ports:
      - 23793:2379
      - 23803:2380
    volumes:
      - $PWD/etcd3-data:/etcd-data
    networks:
      - etcd-net

networks:
  etcd-net:
# 启动etcd集群
docker-compose up -d

# 查看集群成员列表
etcdctl --endpoints=http://127.0.0.1:23791 member list

# 查看成员状态
etcdctl --write-out=table --endpoints=http://127.0.0.1:23791 endpoint status


4.3 在多个节点安装etcd集群

一共三个节点,IP地址分别是192.168.3.101、192.168.3.102、192.168.3.103。

在192.168.3.101节点创建docker-compose.yml文件,内容如下:

version: "3.7"
  
services:
  etcd:
    image: quay.io/coreos/etcd
    container_name: my-etcd
    restart: always
    environment:
      - ETCDCTL_API=3
    command:
      - etcd
      # 成员
      - --name=etcd1
      - --data-dir=/etcd-data
      - --listen-peer-urls=http://0.0.0.0:2380
      - --listen-client-urls=http://0.0.0.0:2379
      # 集群
      - --initial-advertise-peer-urls=http://192.168.3.101:2380
      - --advertise-client-urls=http://192.168.3.101:2379
      - --initial-cluster-token=cluster-token
      - --initial-cluster=etcd1=http://192.168.3.101:2380,etcd2=http://192.168.3.102:2380,etcd3=http://192.168.3.103:2380
      - --initial-cluster-state=new
    volumes:
      - $PWD/etcd-data:/etcd-data
    ports:
      - 2379:2379
      - 2380:2380
    network_mode: "host"
    stdin_open: true
    tty: true


在192.168.3.102节点创建docker-compose.yml文件,内容如下:

version: "3.7"
  
services:
  etcd:
    image: quay.io/coreos/etcd
    container_name: my-etcd
    restart: always
    environment:
      - ETCDCTL_API=3
    command:
      - etcd
      # 成员
      - --name=etcd2
      - --data-dir=/etcd-data
      - --listen-peer-urls=http://0.0.0.0:2380
      - --listen-client-urls=http://0.0.0.0:2379
      # 集群
      - --initial-advertise-peer-urls=http://192.168.3.102:2380
      - --advertise-client-urls=http://192.168.3.102:2379
      - --initial-cluster-token=cluster-token
      - --initial-cluster=etcd1=http://192.168.3.101:2380,etcd2=http://192.168.3.102:2380,etcd3=http://192.168.3.103:2380
      - --initial-cluster-state=new
    volumes:
      - $PWD/etcd-data:/etcd-data
    ports:
      - 2379:2379
      - 2380:2380
    network_mode: "host"
    stdin_open: true
    tty: true


在192.168.3.103节点创建docker-compose.yml文件,内容如下:

version: "3.7"
  
services:
  etcd:
    image: quay.io/coreos/etcd
    container_name: my-etcd
    restart: always
    environment:
      - ETCDCTL_API=3
    command:
      - etcd
      # 成员
      - --name=etcd3
      - --data-dir=/etcd-data
      - --listen-peer-urls=http://0.0.0.0:2380
      - --listen-client-urls=http://0.0.0.0:2379
      # 集群
      - --initial-advertise-peer-urls=http://192.168.3.103:2380
      - --advertise-client-urls=http://192.168.3.103:2379
      - --initial-cluster-token=cluster-token
      - --initial-cluster=etcd1=http://192.168.3.101:2380,etcd2=http://192.168.3.102:2380,etcd3=http://192.168.3.103:2380
      - --initial-cluster-state=new
    volumes:
      - $PWD/etcd-data:/etcd-data
    ports:
      - 2379:2379
      - 2380:2380
    network_mode: "host"
    stdin_open: true
    tty: true


三个节点的etcd运行脚本不一样的地方只有三个启动参数,分别是–name、–initial-advertise-peer-urls、–advertise-client-urls。

# 分别在3个节点启动etcd
docker-compose up -d

# 查看集群成员列表
export ENDPOINTS=192.168.3.101:2379,192.168.3.102:2379,192.168.3.103:2379
etcdctl --endpoints=$ENDPOINTS member list

# 查看成员状态
etcdctl --write-out=table --endpoints=$ENDPOINTS endpoint status


4.4 TLS加密通信

如果集群需要使用TLS协议进行的加密通信,又要验证其身份,需要添加自签名证书,生成自签名证书脚本文件gen-peer-certs.sh如下:

#!/bin/bash


# -------------------------------- 参数判断 --------------------------------------------
params=$@

if [ $# -lt 1 ]; then
    echo "param is empty"
    echo "usage: $0 [domain ...] | [ip ...]"
    echo "eg: $0 zhuyasen.com 192.168.3.100"
    echo ""
    exit
fi

# 用参数替换req-csr.json固定字段值domains_or_ips
hostFields=''
for val in $params; do
    hostFields=${hostFields}\\\"$val\\\",
done
# 去掉最后一个逗号
hostFields=${hostFields%?}


# ---------------------------------- 创建认证中心(CA) ----------------------------------
# 创建存储证书目录和配置目录
mkdir -p certs
cd certs
mkdir -p config

# 创建根CA证书和私钥的CSR(证书签名请求文件)配置文件ca-csr.json
cat > config/ca-csr.json <<EOF
{
    "CN": "myMechanism",
    "key": {
        "algo": "rsa",
        "size": 2048
    },
    "names": [
        {
            "C": "CN",
            "ST": "Beijing",
            "L": "Beijing",
            "O": "my organization",
            "OU": "my organization unit name"
        }
    ]
}
EOF

# 生成CA证书和私钥,先判断ca-key.pem是否存在,如果存在则使用已存在的
if [[ ! -f "ca-key.pem" ]]; then
  echo "generate new ca.pem and ca-key.pem"
  cfssl gencert -initca config/ca-csr.json | cfssljson -bare ca -
else
  echo "use existing ca-key.pem."
fi


# ---------------------------------- 派发证书  ----------------------------------
# 创建证书签名请求配置文件req-csr.json,这里填写申请组织信息、ip或域名,注:当集群使用tls认证,并且只使用ip访问时,必须添加本地ip进来,外部访问才能通过鉴权,否则只能本地访问
cat > config/req-csr.json <<EOF
{
    "CN": "etcd",
    "hosts": [
        "localhost",
        "127.0.0.1",
        "domains_or_ips"
    ],
    "key": {
        "algo": "rsa",
        "size": 2048
    },
    "names": [
        {
            "C": "CN",
            "ST": "GuangDong",
            "L": "GuangZhou",
            "O": "communication",
            "OU": "cluster"
        }
    ]
}
EOF


# 配置证书生成策略,让CA知道颁发什么样的证书。
cat > config/ca-config.json <<EOF
{
    "signing":{
        "default":{
            "expiry":"87600h"
        },
        "profiles":{
            "peer":{
                "expiry":"8760h",
                "usages":[
                    "signing",
                    "key encipherment",
                    "server auth",
                    "client auth"
                ]
            }
        }
    }
}
EOF


tagName='\"domains_or_ips\"'
# 把domains_or_ips替换为输入的参数
sed -i "s/$tagName/$hostFields/g" config/req-csr.json

# 使用现有证书和私钥重新颁发新的证书,类型为服务端使用
for name in $params; do
  cfssl gencert -ca=ca.pem -ca-key=ca-key.pem -config=config/ca-config.json -profile=peer config/req-csr.json | cfssljson -bare $name
  cfssl gencert -ca=ca.pem -ca-key=ca-key.pem -config=config/ca-config.json -profile=peer config/req-csr.json | cfssljson -bare peer-$name
done

# 删除多余文件
ls | grep -v pem | xargs -i rm -rf {}


# 执行脚本生成证书,参数为各个节点ip地址,在当前certs目录下
./gen-peer-certs.sh 192.168.3.101 192.168.3.102 192.168.3.103

# 验证生成的证书是否有效
cd certs
openssl verify -CAfile ca.pem 192.168.3.101.pem
openssl verify -CAfile ca.pem peer-192.168.3.101.pem

# 在docker-compose.yml添加共享目录,把证书复制到容器里
    volumes:
      - $PWD/certs:/etcd-certs

# 在docker-compose.yml的etcd启动命令中添加证书参数,每个节点都需要添加
    command:
      # client端证书
      - --client-cert-auth
      - --trusted-ca-file=/etcd-certs/ca.pem
      - --cert-file=/etcd-certs/192.168.3.xxx.pem
      - --key-file=/etcd-certs/192.168.3.xxx-key.pem
      # peer端证书
      - --peer-client-cert-auth
      - --peer-trusted-ca-file=/etcd-certs/ca.pem
      - --peer-cert-file=/etcd-certs/peer-192.168.3.xxx.pem
      - --peer-key-file=/etcd-certs/peer-192.168.3.xxx-key.pem

# 连接客户端需要认证参数
--cacert --cert --key这三个参数都不能缺少,其中--cert --key这两个参数可以是任意一个节点的客户端证书和私钥

# 示例
etcdctl --endpoints=$ENDPOINTS \
--cacert=ca.pem \
--cert=192.168.3.101.pem \
--key=192.168.3.101-key.pem \
member list

# 为了避免每次都输入一大串参数,在~/.bashrc添加下面别名
export ETCDCTL_API=3
HOST_1=192.168.3.101:23791
HOST_2=192.168.3.102:23792
HOST_3=192.168.3.103:23793
ENDPOINTS=$HOST_1,$HOST_2,$HOST_3
alias etcdctl="etcdctl --endpoints=$ENDPOINTS"
certPath=/home/zhuyasen/work/etcd/etcd-cluster-local/certs
alias etcdctlcert="etcdctl --endpoints=$ENDPOINTS --cacert=${certPath}/ca.pem --cert=${certPath}/192.168.3.101.pem --key=${certPath}/192.168.3.101-key.pem"

# 刷新生效
source ~/.bashrc

# 使用
etcdctlcert member list

注:如果是在本地一台主机使用docker搭建的需要tls鉴权认证的etcd集群,局域网内其他主机想要通过tls鉴权访问集群,必须把etcd集群所在主机的ip地址填写到req-csr.json配置文件的hosts字段下,否则会报错authentication handshake failed: x509: certificate is valid for 127.0.0.1, not 192.168.3.5(集群所在主机的ip地址)


4.5 etcd集群性能压测

定义性能的两个因素是:延迟和吞吐量,延迟是完成操作所花费的时间。吞吐量是一段时间内完成的全部操作。

# 下载压测工具(需要代理),需要先安装go才可以下载编译
set https_proxy=http://127.0.0.1:10809
set http_proxy=http://127.0.0.1:10809
go get -v go.etcd.io/etcd/tools/benchmark

# 进去目录$GOPATH/src/go.etcd.io/etcd/tools/benchmark
go build
sudo mv benchmark /usr/local/bin


设置环境变量

set HOST_1=192.168.3.101:2379
set HOST_2=192.168.3.102:2379
set HOST_3=192.168.3.103:2379

# 获取主节点(IS LEADER为true)
etcdctl endpoint status --endpoints=${HOST_1},${HOST_2},${HOST_3}
# 得知主节点为HOST_3


(1) 写入压测

# 压测写入主节点(多用户)
benchmark --endpoints=${HOST_1} --target-leader  --conns=100 --clients=1000 \
    put --key-size=8 --sequential-keys --total=100000 --val-size=256
# 结果: 吞吐量65985.5875 req/s,平均延时14.8毫秒

# 压测写入所有成员(多用户)
benchmark --endpoints=${HOST_1},${HOST_2},${HOST_3} --conns=100 --clients=1000 \
    put --key-size=8 --sequential-keys --total=100000 --val-size=256
# 结果: 吞吐量62264.1271 req/s,平均延时15.7毫秒


(2) 读取压测

# 线性化(linearizabe)读取数据
benchmark --endpoints=${HOST_1},${HOST_2},${HOST_3} --conns=100 --clients=1000 range foo --consistency=l --total=100000
# 结果: 吞吐量103923.3802 req/s,平均延时9毫秒

# 串行化(serializabe)读取数据
benchmark --endpoints=${HOST_1},${HOST_2},${HOST_3} --conns=100 --clients=1000 range foo --consistency=s --total=100000
# 结果: 吞吐量115904.776 req/s,平均延时8.1毫秒



5 etcdctl常用命令

etcdctl是一个命令行的客户端,它提供了简洁的命令,可理解为命令工具集,可以方便我们在对服务进行测试或者手动修改数据库内容。etcdctl与kubectl、systemctl命令原理及操作类似。

用法:

etcdctl [global options] command [command options][args…]

安装etcdctl:

# 方式一:从github官网下载 https://github.com/etcd-io/etcd/releases

# 方式二:从运行的docker中复制到本地
sudo docker cp etcd容器ID或名称:/usr/local/bin/etcdctl /usr/local/bin


etcd 在键的组织上采用了层次化的空间结构(类似于文件系统中目录的概念),数据库操作围绕对键值和目录的 CRUD 增删改查完整生命周期的管理。

具体的命令选项参数可以通过 etcdctl command –help来获取相关帮助,下面都是V3版本命令。

指定etcd集群,在~/.bashrc添加下面内容

export ETCDCTL_API=3
HOST_1=127.0.0.1:23791
HOST_2=127.0.0.1:23792
HOST_3=127.0.0.1:23793
ENDPOINTS=$HOST_1,$HOST_2,$HOST_3
# 覆盖etcdctl命令,如果需要使用原生命令,可以在命令开头加一个\反斜线,例如:\etcdctl xxxx xxxx
alias etcdctl="etcdctl --endpoints=$ENDPOINTS"
alias etcdctljson="etcdctl --endpoints=$ENDPOINTS --write-out=json"
alias etcdctltable="etcdctl --endpoints=$ENDPOINTS --write-out=table"

刷新生效:

source ~/.bashrc


KV API的操作有下面保证:

  • 原子性,所有API请求都是原子请求,一个操作要么完全完成,要么根本不完成。对于监视请求,由一个操作生成的所有事件将在一个监视响应中,Watch从不观察单个操作的部分事件。
  • 耐用性,任何完成的操作都是持久的,所有可访问的数据也是持久数据,读取将永远不会返回尚未持久化的数据。
  • 严格的可序列化性 ,这是分布式事务数据库系统的最强隔离保证,读操作将永远不会观察到任何中间数据。


5.1 增删改查数据相关命令

增加和修改,如果存在则替换

etcdctl put <键名> <键值> [选项]

# 示例
etcdctl put key "Hello ETCD"
etcdctl put key1 "Hello ETCD 1"
etcdctl put leaseKey "alive value" --lease=12f775cb02d34d94 # 有生命周期的key


查询

etcdctl get <键名> [选项]

# 示例:
etcdctl get key
etcdctl get key --prefix # 相同前缀查找
etcdctl get / --prefix --keys-only # 只获取/开始的所有key,不包括值


删除

etcdctl del <键名> [选项]

# 示例:
etcdctl del key
etcdctl get key --prefix # 相同前缀删除


5.2 集群状态相关命令

查看集群状态

etcdctl endpoint status --write-out=table

# +-----------------+------------------+---------+---------+-----------+-----------+------------+
# |    ENDPOINT     |        ID        | VERSION | DB SIZE | IS LEADER | RAFT TERM | RAFT INDEX |
# +-----------------+------------------+---------+---------+-----------+-----------+------------+
# | 127.0.0.1:23791 | ade526d28b1f92f7 |   3.3.8 |   22 MB |     false |        10 |       9177 |
# | 127.0.0.1:23792 | d282ac2ce600c1ce |   3.3.8 |   22 MB |     false |        10 |       9177 |
# | 127.0.0.1:23793 | bd388e7810915853 |   3.3.8 |   22 MB |      true |        10 |       9177 |
# +-----------------+------------------+---------+---------+-----------+-----------+------------+


查看集群健康状态

etcdctl endpoint health

# 127.0.0.1:23793 is healthy: successfully committed proposal: took = 583.669µs
# 127.0.0.1:23792 is healthy: successfully committed proposal: took = 710.885µs
# 127.0.0.1:23791 is healthy: successfully committed proposal: took = 734.486µs


5.3 集群成员操作相关命令

查看集群成员列表

etcdctl member list --write-out=table

# +------------------+---------+-------+-------------------+---------------------+
# |        ID        | STATUS  | NAME  |    PEER ADDRS     |    CLIENT ADDRS     |
# +------------------+---------+-------+-------------------+---------------------+
# | ade526d28b1f92f7 | started | etcd1 | http://etcd1:2380 | http://0.0.0.0:2379 |
# | bd388e7810915853 | started | etcd3 | http://etcd3:2380 | http://0.0.0.0:2379 |
# | d282ac2ce600c1ce | started | etcd2 | http://etcd2:2380 | http://0.0.0.0:2379 |
# +------------------+---------+-------+-------------------+---------------------+


添加成员

etcdctl member add <成员名称> [--peer-urls=节点地址]

# 示例:将目标节点etcd4添加到集群
etcdctl member add etcd4 http://192.168.3.104:2380
# 启动目标集群时需要设置启动参数如下
etcd --name=etcd4 --data-dir=/etcd-data \
  --listen-peer-urls=http://192.168.3.104:2380 \
  --listen-client-urls=http://192.168.3.104:2379 \
  --initial-advertise-peer-urls=http://192.168.3.104:2380 \
  --advertise-client-urls=http://192.168.3.104:2379 \
  --initial-cluster=etcd1=http://192.168.3.101:2380,etcd2=http://192.168.3.102:2380,etcd3=http://192.168.3.103:2380,etcd4=http://192.168.3.104:2380 \
  --initial-cluster-state=existing


更新成员

新成员必须启动的,类似添加成员

etcdctl member update <成员id> [--peer-urls=节点地址]

  # 示例:
  etcdctl member update ade526d28b1f92f7 --peer-urls=http://192.168.3.111:2380


删除成员

etcdctl member remove <成员id>

# 示例
etcdctl member remove ade526d28b1f92f7

5.4 租约相关命令

租约具有生命周期,需要为租约授予一个TTL(time to live),将租约绑定到一个key上,则key的生命周期与租约一致,可续租,可撤销租约。

etcdctl lease

etcdctl lease 子命令有:

  • grant: 添加新租约
  • revoke: 删除租约
  • list: 列出所有租约
  • timetolive: 获取租约详情信息
  • keep-alive: 保持租约有效(续签)


# 生成一份新租约
etcdctl lease grant 600

# 查看租约列表
etcdctl lease list

# 查看租约的剩余生命时长,可以使用json输出得到字段值
etcdctl lease timetolive 12f775cb02d34d94
etcdctl lease timetolive 12f775cb02d34d94 --keys # 查看已绑定的key

# 撤销租约,绑定租约的key也会自动失效
etcdctl lease revoke 12f775cb02d34d94

# 续租
etcdctl lease keep-alive 12f775cb02d34d94 # 持续续租,无过期(阻塞)
etcdctl lease keep-alive 12f775cb02d34d94 --once # 将保持存活时间重置为其原始值并立即退出

# key绑定租约
etcdctl put leaseKey "alive value" --lease=12f775cb02d34d94


5.5 watch命令

watch是监听键或前缀发生改变的事件流。

# 对某个key监听操作,当/key1发生改变时,会返回最新值
etcdctl watch /key1

# 监听key前缀
etcdctl watch /key --prefix

# 监听到改变后执行相关操作
etcdctl watch /key1 -- etcdctl member list

5.6 事务txn

txn从标准输入读取多个etcd请求,并将它们作为单个原子事务应用。 交易由条件清单组成,如果所有条件都为真,则应用请求列表;如果任何条件为假,则不应用请求列表。

etcdctl txn [options]

# 设置key值
etcdctl put name zhangsan
etcdctl put age 22

# 交互式事务
etcdctl txn -i
# ---------------- 进入终端交互式 ----------------
compares:
value("age") > "18" # 条件清单1
value("name") = "zhangsan" # 条件清单2

success requests (get, put, del):
put result true  # 所有条件成立执行命令

failure requests (get, put, del):
put result false  # 至少有一个条件清单不成立执行命令

SUCCESS

OK
# ---------------- 结束终端交互式 ----------------

# 查看事务执行结果
etcdctl get result
  # result
  # true


5.7 分布式锁

分布式锁,多个客户端同时抢锁,抢到锁可以操作,其他没有获取到锁的会等待阻塞状态,等锁释放之后才可以获取到锁。

etcdctl lock [options] [exec-command arg1 arg2 …]

# 在第一个终端
etcdctl lock mutexKey
  # mutexKey/326963a02758b52d
​
# 在第二终端
etcdctl lock mutexKey
​
# 当第一个终端结束了,第二个终端会显示
mutexKey/326963a02758b531

注:只有当正常退出且释放锁后,lock命令的退出码是0,否则这个锁会一直被占用直到过期。


5.7 备份和恢复命令

# 快照
etcdctl snapshot save backup.db

# 查看快照文件信息
etcdctl snapshot status backup.db --write-out=table

# 恢复快照
etcdctl snapshot restore backup.db \
--name=etcd1 \
--data-dir=xxx \
--initial-advertise-peer-urls=xxx \
--initial-cluster=xxx \
--initial-cluster-token=xxx


5.8 查看警报命令

如果内部出现问题,会触发警报,可以通过命令查看警报引起原因。

# 查看所有警报
etcdctl alarm list

# 解除所有警报
etcdctl alarm disarm


5.9 用户和角色相关命令

etcd默认是没有开启访问控制的,如果开启外网访问etcd的话就需要考虑访问控制的问题,etcd提供了两种访问控制的方式:

  • 基于身份验证的访问控制
  • 基于证书的访问控制

etcd有一个特殊用户root和一个特殊角色root:

  • root用户:root用户是etcd的超级管理员,拥有etcd的所有权限,在开启角色认证之前为们必须要先建立好root用户。
  • root角色:具有该root角色的用户既具有全局读写访问权限,具有更新集群的身份验证配置的权限。此外,该root角色还授予常规集群维护的特权,包括修改集群成员资格,对存储进行碎片整理以及拍摄快照。

注:root用户必须拥有root角色之后,root用户才允许在操作etcd的所有东西。


etcd的权限资源:

  • Users: user用来设置身份认证(user:passwd),一个用户可以拥有多个角色,每个角色被分配一定的权限(只读、只写、可读写),用户分为root用户和非root用户。
  • Roles: 角色用来关联权限,角色主要三类:root角色。默认创建root用户时即创建了root角色,该角色拥有所有权限;guest角色,默认自动创建,主要用于非认证使用。普通角色,由root用户创建角色,并分配指定权限。
  • Permissions: 权限分为只读、只写、可读写三种权限,权限即对指定目录或key的读写权限。

注意:如果没有指定任何验证方式,即没显示指定以什么用户进行访问,那么默认会设定为 guest 角色。默认情况下 guest 也是具有全局访问权限的。


管理用户的子命令

etcdctl user

etcdctl user 子命令有:

  • add: 添加新用户
  • delete: 删除用户
  • get: 获取用户的详细信息
  • list: 列出所有用户
  • passwd: 修改用户密码
  • grant-role: 授予用户角色
  • revoke-role: 撤销用户的角色


管理角色的子命令

etcdctl role

etcdctl role 子命令有:

  • add: 添加新角色
  • delete: 删除角色
  • get: 获取角色的详细信息
  • list: 列出所有角色
  • grant-permission: 把key操作权限授予给一个角色
  • revoke-permission: 从角色中撤销key操作权限


开启root身份验证

开启了身份验证之后,所有etcdctl命令操作都需要指定用户参数–user,参数值为用户名:密码,类似开启了证书访问控制之后,所有etcdctl命令需要添加证书参数–cacert.

# 创建root后,root用户默认拥有类似linux一样超级管理员权限,添加用户root后默认还有root角色
etcdctl user add root
  # Password of root: 123456
  # Type password of root again for confirmation: 123456

# 开启身份验证,如果取消把enable改为disable
etcdctl auth enable

# 操作时必须指定用户,否则会报错
etcdctl put key "hello etcd" --user=root:123456
etcdctl get key --user=root:123456


新用户和角色授权

开启了root身份验证之后,就可以对普通用户和角色操作了。

(1) 用户增删改查

# 添加新用户zhangsan
etcdctl user add zhangsan --user=root:123456
  # Password of root: 123456
  # Type password of root again for confirmation: 123456

# 获取用户的详细信息
etcdctl user get zhangsan --user=root:123456

# 查看所有用户
etcdctl user list --user=root:123456

# 修改用户密码
etcdctl user passwd zhangsan --user=root:123456

# 删除用户
etcdctl user delete zhangsan --user=root:123456


(2) 角色增删改查

# 添加新角色redis
etcdctl role add redis --user=root:123456

# 获取角色的详细信息
etcdctl role get redis --user=root:123456

# 查看所有角色
etcdctl role list --user=root:123456

# 删除角色
etcdctl role delete redis --user=root:123456


(3) 绑定和授权

有了新用户和新角色之后,还需要把用户和角色绑定在一起,确定授权权限之后,新用户才可以对key有对应操作权限

# 授予角色redis权限,可以设置只读(read)、只写(write)、读写(readwrite)
etcdctl role grant-permission redis readwrite redisKey/ --user=root:123456
# 或授予key前缀 etcdctl role grant-permission redis readwrite redisKey/  --prefix=true  --user=root:123456

# 用户zhangsan绑定redis角色,获得操作权限
etcdctl user grant-role zhangsan redis --user=root:123456


# 下面是用户zhangsan在授权前后操作redisKey/
  xxxx@pc:~$ etcdctl put redisKey/ "hello redis" --user=root:123456
    # OK
  xxxx@pc:~$ etcdctl get redisKey/ --user=root:123456
    # redisKey/
    # hello redis
  xxxx@pc:~$ etcdctl get redisKey/ --user=zhangsan:123456
    # Error: etcdserver: permission denied
  xxxx@pc:~$ etcdctl role grant-permission redis readwrite redisKey/ --user=root:123456
    # Role redis updated
  xxxx@pc:~$ etcdctl get redisKey/ --user=zhangsan:123456
    # Error: etcdserver: permission denied
  xxxx@pc:~$ etcdctl user grant-role zhangsan redis --user=root:123456
    # Role redis is granted to user zhangsan
  xxxx@pc:~$ etcdctl get redisKey/ --user=zhangsan:123456
    # redisKey/
    # hello redis


# 撤回角色redis对redisKey/的操作权限
etcdctl role revoke-permission redis redisKey/ --user=root:123456

# 解绑用户zhangsan和角色redis,也就是用户zhangsan操作权限(redis角色的权限)被收回
etcdctl user revoke-role zhangsan redis --user=root:123456



6 etcd的go客户端

6.1 安装

不要直接使用 go get -u go.etcd.io/etcd 命令安装etcd客户端,可会遇到些奇怪问题,直接从github下载稳定版本 https://github.com/etcd-io/etcd/archive/v3.4.13.zip

# 在$GOPATH/src下创建目录go.etcd.io

# 解压文件v3.4.13.zip,并把目录名称改为etcd,然后把整个etcd目录移动到$GOPATH/src/go.etcd.io/目录下即可


6.2 连接etcd服务

(1) 简单连接

func InitETCD(endPoints []string) (*clientv3.Client, error) {
    // 配置
    config := clientv3.Config{
        Endpoints:   endPoints,
        DialTimeout: 10 * time.Second,
    }

    // 连接
    return clientv3.New(config)
}

/* 调用
    cli, err := InitETCD([]string{"192.168.3.5:2379"})
    if err != nil {
        panic(err)
    }
*/


(2) 有tls身份验证连接

// InitETCDWithTLS 连接需要认证的etcd
func InitETCDWithTLS(endPoints []string, caFile, certFile, keyFile string) (*clientv3.Client, error) {
    tlsInfo := transport.TLSInfo{
        TrustedCAFile: caFile,
        CertFile:      certFile,
        KeyFile:       keyFile,
    }
    tlsConfig, err := tlsInfo.ClientConfig()
    if err != nil {
        return nil, err
    }

    // 配置etcd
    config := clientv3.Config{
    Endpoints: endPoints,
        DialTimeout: 5 * time.Second,
        TLS:       tlsConfig,
    }

    return clientv3.New(config)
}

/*
    endPoints := []string{"192.168.3.5:23791", "192.168.3.5:23792", "192.168.3.5:23793"}
    caFile := "D:\\certs\\ca.pem"
    certFile := "D:\\certs\\etcd1.pem"
  keyFile := "D:\\certs\\etcd1-key.pem"
  
    cli, err := InitETCDWithTLS(endPoints, caFile, certFile, keyFile)
    if err != nil {
        panic(err)
    }
*/


6.3 增删改查数据

(1) 增加和修改

    putResp, err := cli.KV.Put(context.Background(), "/user/zhangsan", "v5")
    if err != nil {
        fmt.Println(err)
        return
    }
    fmt.Println("global version is", putResp.Header.Revision)


  // 增加或修改时返回上一个版本值和版本号
    putResp, err := cli.KV.Put(context.Background(), "/user/zhangsan", "v6", clientv3.WithPrevKV())
    if err != nil {
        fmt.Println(err)
        return
    }
    fmt.Println("global version is", putResp.Header.Revision)
    if putResp.PrevKv != nil && string(putResp.PrevKv.Value) != "" {
        fmt.Printf("preVal=%s, preVersion=%d\n",
            string(putResp.PrevKv.Value), // 上一个版本值
            putResp.PrevKv.Version,       // 上一个历史版本号
        )
    }

(2) 查询

  key:="job1"

  // ops为空时,只查询单个key
    ops := []clientv3.OpOption{
        // 前缀查询
        //clientv3.WithPrefix(),

        // 范围查询
        //clientv3.WithRange("/job4"),

        // 查询结果排序,查询结果可以按Key、Value、CreateRevision(key创建的版本)、Version(key历史版本数量)、ModRevision(key对应最新版本)排序,可以升序(Ascend)或降序(Descend)。
        //clientv3.WithSort(clientv3.SortByVersion, clientv3.SortDescend),

        // 获取数量
    //clientv3.WithCountOnly(), // 只返回数量,不返回kv,可以判断指定key是否存在,可以用来统计key数量

        // 实现翻页功能
        //clientv3.WithFromKey(),
        //clientv3.WithLimit(3),
    }

    getResp, err := cli.KV.Get(context.Background(), key, ops...)
    if err != nil {
        fmt.Println(err)
        return
    }
    for i, v := range getResp.Kvs {
        fmt.Printf("(%d) %s -> %s, historySize=%d, createRev=%d, modRev=%d,headRev=%d\n",
            i,
            string(v.Key),
            string(v.Value),
            v.Version,               // 本key一共有多少个历史版本
            v.CreateRevision,        // 第一次创建所在版本号
            v.ModRevision,           // 最新更改版本号
            getResp.Header.Revision, // 全局最大版本号
        )
    }


(3) 删除

  // ops为空时,只删除个key
    ops := []clientv3.OpOption{
    // 删除匹配前缀所有的key
  //clientv3.WithPrefix(),
  }

    delResp, err := cli.KV.Delete(context.Background(), key, ops...)
    if err != nil {
        fmt.Println(err)
        return
  }
  
    fmt.Sprintln("deleted count: ", delResp.Deleted) // 删除了多少个key
    fmt.Sprintln("PrevKvs : ", delResp.PrevKvs)      // 删除了哪些key
  }


6.4 事务txn

    key := "num"

    _, err := cli.KV.Put(context.Background(), key, "05")
    if err != nil {
        panic(err)
    }

    // 链式操作
    txnResp, err := cli.KV.Txn(context.Background()).
        If(clientv3.Compare(clientv3.Value(key), ">", "10")). // 条件,可以多个clientv3.Cmp
        Then(clientv3.OpPut(key, "10")).                      // 所有条件成立,执行的操作,可以多个clientv3.Op
        Else(clientv3.OpPut(key, "0")).                       // 至少一个条件不成立,执行的操作,可以多个clientv3.Op
        Commit()
    if err != nil {
        panic(err)
    }

    if txnResp.Succeeded {
        fmt.Println("txn success")
    } else {
        fmt.Println("txn failed")
    }


6.5 租约lease

(1) 生成租约

    // 生成一个新的租约(单位秒)
    grantResp, err := cli.Grant(context.Background(), 300)
    if err != nil {
        panic(err)
    }
    fmt.Println("lease id is", grantResp.ID, "or", strconv.FormatInt(int64(grantResp.ID), 16))

    // 把新的租约绑定到key,租约过期后自动删除
    _, err = cli.KV.Put(context.Background(), "foo", "bar", clientv3.WithLease(grantResp.ID))
    if err != nil {
        panic(err)
    }

    // 只按原来租约时间续签一次
    kao, kaerr := cli.KeepAliveOnce(context.Background(), grantResp.ID)
    if kaerr != nil {
        panic(err)
    }
    fmt.Println("ttl:", kao.TTL)

    // 持续续签租约,直到程序运行结束
    kaResp, err := cli.KeepAlive(context.Background(), grantResp.ID)
    if err != nil {
        panic(err)
    }
    ka := <-kaResp // 通过管道获取返回信息,缓冲管道
    fmt.Println(ka.ID, ka.TTL)

    // 撤销租约,会使绑定租约的key立即删除
    time.Sleep(time.Second * 30)
    _, err = cli.Revoke(context.Background(), grantResp.ID)
    if err != nil {
        panic(err)
    }
  fmt.Printf("revoke lease(%d) success", grantResp.ID)
  
    // 查看租约还有多久过期,从而知道绑定的key什么时候过期
    ttlResp, err := cli.TimeToLive(context.Background(), grantResp.ID, clientv3.WithAttachedKeys())
    if err != nil {
        panic(err)
    }
    fmt.Println(
        ttlResp.GrantedTTL, // 租约总时长(秒)
        ttlResp.TTL,        // 剩余时长(秒)
        string(bytes.Join(ttlResp.Keys, []byte(","))), // 绑定的key
    )


6.7 监听watch

watch监听put和delete事件,可以使用ctx取消监听。

    // 监听一个key
    go func() {
        wChan := cli.Watch(context.Background(), "/user/zhangsan") // 对key监听
        for wResp := range wChan {                                 // 监听key值是否有变化(一直阻塞),也可以使用for select来获取管道信息,结合ctx来控制是否退出监听
            for _, event := range wResp.Events { // 查看事件,根据事件类型(修改、删除)做出相应处理
                fmt.Printf("%s %q : %q\n", event.Type, event.Kv.Key, event.Kv.Value)
            }
        }
    }()

    // 监听key前缀
    go func() {
        ctx, _ := context.WithCancel(context.Background())
        wChan := cli.Watch(ctx, "/user/", clientv3.WithPrefix()) // 对适配前缀所有key监听
        for {
            select {
            case wResp := <-wChan:
                for _, event := range wResp.Events { // 查看事件,根据事件类型(修改、删除)做出相应处理
                    fmt.Printf("%s %q : %q\n", event.Type, event.Kv.Key, event.Kv.Value)
                }
            case <-ctx.Done():
                fmt.Println(ctx.Err())
                return
            }
        }
    }()

    // 不管key有没有更新,etcd会每个10分钟发送一次通知事件
    go func() {
        wChan := cli.Watch(context.Background(), "foo", clientv3.WithProgressNotify())
        for wResp := range wChan {
            for _, event := range wResp.Events {
                fmt.Printf("%s %q : %q\n", event.Type, event.Kv.Key, event.Kv.Value)
            }
            fmt.Printf("wResp.Header.Revision: %d\n", wResp.Header.Revision)
            fmt.Println("wResp.IsProgressNotify:", wResp.IsProgressNotify())
        }
    }()

    // key范围监听,不包括最大值
    go func() {
        wChan := cli.Watch(context.Background(), "job1", clientv3.WithRange("job3"))
        for wResp := range wChan {
            for _, event := range wResp.Events {
                fmt.Printf("%s %q : %q\n", event.Type, event.Kv.Key, event.Kv.Value)
            }
        }
    }()


6.8 实现分布式锁示例

package main

import (
    "context"
    "fmt"
    "time"

    "go.etcd.io/etcd/clientv3"
)

func main() {
    cli, err := InitETCD([]string{"192.168.3.5:2379"})
    if err != nil {
        panic(err)
    }

    handle := func() {
        fmt.Println("处理业务中...")
        time.Sleep(10 * time.Second)
        fmt.Println("处理业务完毕")
    }

    DistributedLock(cli, handle)
}

// InitETCD 连接etcd
func InitETCD(endPoints []string) (*clientv3.Client, error) {
    // 配置
    config := clientv3.Config{
        Endpoints:   endPoints,
        DialTimeout: 10 * time.Second,
    }

    // 连接
    return clientv3.New(config)
}

// DistributedLock 分布式锁
func DistributedLock(cli *clientv3.Client, handle func()) {
    // 生成一个新的租约(单位秒)
    grantResp, err := cli.Grant(context.Background(), 5)
    if err != nil {
        return
    }
    fmt.Println("new lease id is", grantResp.ID)

    ctx, cancelFunc := context.WithCancel(context.Background())
    // 处理完业务后结束租约
    defer func() {
        cli.Revoke(context.Background(), grantResp.ID)
        cancelFunc()
    }()

    // 持续续租,直到处理完业务
    err = keepAlive(ctx, cli, grantResp.ID)
    if err != nil {
        fmt.Println(err)
        return
    }

    // 新建事务
    txn := cli.KV.Txn(context.Background())
    key := "/lock/mutex"
    // 判断key是否存在,不存在说明成功抢到锁
    txn.If(clientv3.Compare(clientv3.CreateRevision(key), "=", 0)).
        Then(clientv3.OpPut(key, "ok", clientv3.WithLease(grantResp.ID))). // 创建key并绑定租约
        Else(clientv3.OpGet(key))                                          // 否则抢锁失败
    txnResp, err := txn.Commit() // 提交事务
    if err != nil {
        fmt.Println(err)
        return
    }

    // 判断是否抢到了锁,true:抢锁成功,false:抢锁失败
    if !txnResp.Succeeded {
        fmt.Printf("锁(%s)已被占用:", key)
        return
    }

    // 处理业务
    handle()
}

// 持续续租
func keepAlive(ctx context.Context, cli *clientv3.Client, leaseID clientv3.LeaseID) error {
    // 自动续租
    keepRespChan, err := cli.KeepAlive(ctx, leaseID)
    if err != nil {
        return err
    }

    // 处理续约应答
    go func(ctx context.Context) {
        for {
            select {
            case _, ok := <-keepRespChan:
                if !ok {
                    return
                }
            case <-ctx.Done():
                return
            }
        }
    }(ctx)

    return nil
}



参考:



专题「数据库」的其它文章 »