etcd

etcd

etcd 是一个分布式键值对存储,设计用来可靠而快速的保存关键数据并提供访问。通过分布式锁,leader选举和写屏障(write barriers)来实现可靠的分布式协作。etcd集群是为高可用,持久性数据存储和检索而准备。

Etcd 是 CoreOS 基于 Raft 协议开发的分布式 key-value 存储,可用于服务发现、共享配置以及一致性保障(如数据库选主、分布式锁等)。

单机安装

下载地址 https://github.com/etcd-io/etcd/releases,解压后查看版本

etcd --version
etcdctl version

创建配置文件:conf.yaml

name: "hezebin-etcd"
data-dir: /usr/local/etcd/data
listen-client-urls: "http://127.0.0.1:2379"
advertise-client-urls: "http://127.0.0.1:2379"
initial-cluster-token: "hezebin-etcd-token"
initial-cluster: "hezebin-etcd=http://127.0.0.1:2380"
initial-advertise-peer-urls: "http://127.0.0.1:2380"
initial-cluster-state: "new"

启动 etcd 服务:

etcd --config-file=/usr/local/etcd/conf.yaml

打开终端,执行以下命令来检查 etcd 服务的健康状态:

etcdctl endpoint health

如果一切正常,您将看到类似以下的输出:

127.0.0.1:2379 is healthy: successfully committed proposal: took = 11.667µs

如果 etcd 服务正在运行且健康,您会收到健康状态的确认。

您还可以通过执行以下命令来查看 etcd 集群的成员状态:

etcdctl member list

这将列出 etcd 集群中的成员信息,包括成员的 ID、名称和状态。

设置键值对

etcdctl put name "hezebin"
etcdctl get name# 查询Etcd所有的key
etcdctl get --prefix ""# 只读取键 name 的值的命令
etcdctl get name --print-value-only# 以 name 为前缀的所有键的命令,结果数量限制为2:
etcdctl get --prefix --limit=2 name# 访问修订版本为 4 时的键的版本
etcdctl get --prefix --rev=4 name# 假设 etcd 集群已经有下列键:
a = 123
b = 456
z = 789
# 读取大于等于键 b 的 byte 值的键的命令:
etcdctl get --from-key b
# 清空数据
etcdctl del name
# 删除所有 n 前缀的节点
etcdctl del n -- prefix
# 删除键 zoo 并返回被删除的键值对的命令:
etcdctl del --prev-kv zoo

watch操作

watch 监测一个键值的变化,一旦键值发生更新,就会输出最新的值并退出

etcdctl watch name#更新键值
etcdctl put name "korbin"# watch 阻塞处返回结果
PUT 
name
korbin

读取键过往版本的值

应用可能想读取键的被替代的值。例如,应用可能想通过访问键的过往版本来回滚到旧的配置。或者,应用可能想通过多个请求来得到一个覆盖多个键的统一视图,而这些请求可以通过访问键历史记录而来。因为 etcd 集群上键值存储的每个修改都会增加 etcd 集群的全局修订版本,应用可以通过提供旧有的 etcd 修改版本来读取被替代的键。

假设 etcd 集群已经有下列键:

foo = bar         # revision = 2
foo1 = bar1       # revision = 3
foo = bar_new     # revision = 4
foo1 = bar1_new   # revision = 5

这里是访问键的过往版本的例子:

$ etcdctl get --prefix foo # 访问键的最新版本
foo
bar_new
foo1
bar1_new
$ etcdctl get --prefix --rev=4 foo # 访问修订版本为 4 时的键的版本
foo
bar_new
foo1
bar1
$ etcdctl get --prefix --rev=3 foo # 访问修订版本为 3 时的键的版本
foo
bar
foo1
bar1
$ etcdctl get --prefix --rev=2 foo # 访问修订版本为 2 时的键的版本
foo
bar
$ etcdctl get --prefix --rev=1 foo # 访问修订版本为 1 时的键的版本

压缩修订版本

如我们提到的,etcd 保存修订版本以便应用可以读取键的过往版本。但是,为了避免积累无限数量的历史数据,压缩过往的修订版本就变得很重要。压缩之后,etcd 删除历史修订版本,释放资源来提供未来使用。所有修订版本在压缩修订版本之前的被替代的数据将不可访问。

这是压缩修订版本的命令:

$ etcdctl compact 5
compacted revision 5
# 在压缩修订版本之前的任何修订版本都不可访问
$ etcdctl get --rev=4 foo
Error:  rpc error: code = 11 desc = etcdserver: mvcc: required revision has been compacted

注意: etcd 服务器的当前修订版本可以在任何键(存在或者不存在)以json格式使用get命令来找到。下面展示的例子中 mykey 是在 etcd 服务器中不存在的:

$ etcdctl get mykey -w=json
{"header":{"cluster_id":14841639068965178418,"member_id":10276657743932975437,"revision":15,"raft_term":4}}

lease租约(过期机制)

应用可以为 etcd 集群里面的键授予租约。当键被附加到租约时,它的存活时间被绑定到租约的存活时间,而租约的存活时间相应的被 time-to-live (TTL)管理。在租约授予时每个租约的最小TTL值由应用指定。租约的实际 TTL 值是不低于最小 TTL,由 etcd 集群选择。一旦租约的 TTL 到期,租约就过期并且所有附带的键都将被删除。

授予租约

# 授予租约,TTL为10秒
$ etcdctl lease grant 10
lease 32695410dcc0ca06 granted with TTL(10s)
# 附加键 foo 到租约32695410dcc0ca06
$ etcdctl put --lease=32695410dcc0ca06 foo barOK

应用通过租约 id 可以撤销租约。撤销租约将删除所有它附带的 key。

撤销租约

$ etcdctl lease revoke 32695410dcc0ca06
lease 32695410dcc0ca06 revoked
$ etcdctl get foo
# 空应答,因为租约撤销导致foo被删除

keepAlive续约

应用可以通过刷新键的 TTL 来维持租约,以便租约不过期。维持同一个租约的命令:

$ etcdctl lease keep-alive 32695410dcc0ca06
lease 32695410dcc0ca06 keepalived with TTL(10)
lease 32695410dcc0ca06 keepalived with TTL(10)
lease 32695410dcc0ca06 keepalived with TTL(10)
...

获取租约信息

应用程序可能想知道租约信息,以便可以更新或检查租约是否仍然存在或已过期。应用程序也可能想知道有那些键附加到了特定租约。

获取租约信息的命令:

$ etcdctl lease timetolive 694d5765fc71500b
lease 694d5765fc71500b granted with TTL(500s), remaining(258s)

获取租约信息和租约附带的键的命令:

$ etcdctl lease timetolive --keys 694d5765fc71500b
lease 694d5765fc71500b granted with TTL(500s), remaining(132s), attached keys([zoo2 zoo1])
# 如果租约已经过期或者不存在,它将给出下面的应答:
Error:  etcdserver: requested lease not found

事务

etcd 的事务(Transaction)机制允许你在一个原子操作中执行一系列操作,这些操作要么全部成功,要么全部失败,确保数据的一致性和完整性。

etcdctl txn 并没有对事务提供过多的支持,执行事务最好通过 go 来实现:

go get go.etcd.io/etcd/client/v3
package mainimport ("context""fmt""log""time""go.etcd.io/etcd/clientv3"
)func main() {// 创建 etcd 客户端cli, err := clientv3.New(clientv3.Config{Endpoints:   []string{"http://localhost:2379"}, // 替换为你的 etcd 地址DialTimeout: 5 * time.Second,})if err != nil {log.Fatal(err)}defer cli.Close()// 创建一个 etcd 事务etcdTxn := clientv3.NewKV(cli).Txn(context.Background())// 定义事务的比较和操作etcdTxn.If(clientv3.Compare(clientv3.Value("my_key"), "=", "10"),).Then(clientv3.OpPut("my_key", "20"),).Else(clientv3.OpPut("my_key", "30"),)// 提交事务txnResp, err := etcdTxn.Commit()if err != nil {log.Fatal(err)}// 检查事务是否成功if !txnResp.Succeeded {fmt.Println("Transaction failed")} else {fmt.Println("Transaction succeeded")}
}

注意:etcdTxn.If 中指定的条件是 clientv3.Compare(clientv3.Value("my_key"), "=", "10"),也就是检查键 “my_key” 的值是否等于 “10”。如果这个条件不满足(例如,键 “my_key” 的值为空),那么事务的 Else 分支会执行,而且整个事务会被标记为失败。

基于etcd实现分布式锁

原生实现

要在 etcd 中实现分布式锁,你可以利用 etcd 的事务特性和租约(Lease)机制。以下是一个使用 etcd 实现分布式锁的示例:

# 伪代码,etcdctl 并不支持下述命令,需要以 go 方式实现,api 更丰富
etcdctl txn -- \put my_lock some_value  --lease=0d7689bc575d6611  \if_not_exists# if_not_exists: 这是一个条件,表示只有在键不存在时才执行上述的 put 操作。这个条件确保只有第一次创建节点的时候才会成功,从而实现锁的获取。

这个命令的目的是在一个 etcd 事务中,尝试创建一个指定键名的键值对,如果该键名在 etcd 中尚不存在(即尝试获取锁),则创建成功,否则操作失败。这个操作模式可以帮助实现分布式锁的基本机制。

Go 代码实现:

package mainimport ("context""fmt""log""time""go.etcd.io/etcd/clientv3"
)func main() {// 创建 etcd 客户端cli, err := clientv3.New(clientv3.Config{Endpoints:   []string{"http://localhost:2379"}, // 替换为你的 etcd 地址DialTimeout: 5 * time.Second,})if err != nil {log.Fatal(err)}defer cli.Close()// 创建租约leaseResp, err := cli.Grant(context.Background(), 10) // 10 秒的租约时间if err != nil {log.Fatal(err)}// 锁的键名lockKey := "my_lock"// 尝试获取锁txnResp, err := cli.Txn(context.Background()).If(clientv3.Compare(clientv3.CreateRevision(lockKey), "=", 0)).Then(clientv3.OpPut(lockKey, "lock_holder", clientv3.WithLease(leaseResp.ID))).Commit()if err != nil {log.Fatal(err)}// 检查是否成功获取锁if !txnResp.Succeeded {log.Println("Failed to acquire lock")return}log.Println("Lock acquired")// 续约循环(保持锁)keepAliveCh, err := cli.KeepAlive(context.Background(), leaseResp.ID)if err != nil {log.Fatal(err)}go func() {for range keepAliveCh {// 续约成功,执行你的逻辑,或者也可以不做任何处理}}()// 在这里执行需要保护的临界区代码// 释放锁(取消续约)_, err = cli.Revoke(context.Background(), leaseResp.ID)if err != nil {log.Fatal(err)}log.Println("Lock released")
}

官方 concurrency 包实现

cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints})
if err != nil {log.Fatal(err)
}
defer cli.Close()// 创建两个单独的会话用来演示锁竞争
s1, err := concurrency.NewSession(cli)
if err != nil {log.Fatal(err)
}
defer s1.Close()
m1 := concurrency.NewMutex(s1, "/my-lock/")s2, err := concurrency.NewSession(cli)
if err != nil {log.Fatal(err)
}
defer s2.Close()
m2 := concurrency.NewMutex(s2, "/my-lock/")// 会话s1获取锁
if err := m1.Lock(context.TODO()); err != nil {log.Fatal(err)
}
fmt.Println("acquired lock for s1")m2Locked := make(chan struct{})
go func() {defer close(m2Locked)// 等待直到会话s1释放了/my-lock/的锁if err := m2.Lock(context.TODO()); err != nil {log.Fatal(err)}
}()if err := m1.Unlock(context.TODO()); err != nil {log.Fatal(err)
}
fmt.Println("released lock for s1")<-m2Locked
fmt.Println("acquired lock for s2")

通过源码可以发现concurrency包的实现原理为执行一个 key 的前缀,并在最终设置到 etcd key 的尾部拼接上 /lease_id,租约为 60 秒,且每隔 20 秒续租一次。

在 tryAcquire 函数中通过 put 上述的键值,判断版本号,若设置成功则拿到锁,否则阻塞等待锁:
在这里插入图片描述

阻塞期间先通过查询一次 key 是否存在判断是否阻塞,若不存在表示拿到锁,结束阻塞;存在则通过 watch 监听 key 的变更,仅允许DELETE类型,其他变更操作会报错,并做强制解锁。
在这里插入图片描述
在这里插入图片描述

服务注册与发现

当使用 etcd 实现服务注册与发现时,通常需要以下步骤:

  1. 引入 etcd 的 Go 客户端库
  2. 连接到 etcd 服务器
  3. 注册服务:将服务的信息写入 etcd 中
  4. 发现服务:从 etcd 中获取已注册的服务信息

以下是一个简单示例,演示如何使用 Go 语言操作 etcd 实现基本的服务注册与发现功能。请确保已经安装了 etcd 并启动了 etcd 服务器。

package mainimport ("context""fmt""log""time""go.etcd.io/etcd/clientv3"
)func main() {// 创建 etcd 客户端cli, err := clientv3.New(clientv3.Config{Endpoints:   []string{"localhost:2379"}, // etcd 服务器地址DialTimeout: 5 * time.Second,})if err != nil {log.Fatal(err)}defer cli.Close()// 注册服务serviceName := "my-service"serviceIP := "192.168.1.100"servicePort := "8080"serviceKey := fmt.Sprintf("/services/%s/%s:%s", serviceName, serviceIP, servicePort)serviceValue := "some-metadata-about-the-service"ctx := context.Background()_, err = cli.Put(ctx, serviceKey, serviceValue)if err != nil {log.Fatal(err)}fmt.Printf("Service registered: %s\n", serviceKey)// 发现服务discoveryKey := fmt.Sprintf("/services/%s", serviceName)resp, err := cli.Get(ctx, discoveryKey, clientv3.WithPrefix())if err != nil {log.Fatal(err)}fmt.Println("Discovered services:")for _, kv := range resp.Kvs {fmt.Printf("Key: %s, Value: %s\n", kv.Key, kv.Value)}
}

可以结合 watch 机制优化更新服务变更。

Go 操作 Etcd 参考

go get go.etcd.io/etcd/client/v3
  • 民间文档:http://www.topgoer.com/%E6%95%B0%E6%8D%AE%E5%BA%93%E6%93%8D%E4%BD%9C/go%E6%93%8D%E4%BD%9Cetcd/%E6%93%8D%E4%BD%9Cetcd.html

  • 官方文档:https://github.com/etcd-io/etcd/blob/main/client/v3/README.md

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/57562.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

一起来看看 Compose Accompanist

好久不见&#xff0c;真的挺久了&#xff0c;之前一个月写的文章比现在多半年的都多。今年第一篇文章是简单写了下 Android 14 的适配&#xff1a;Android 14 又来了&#xff1f;别扶&#xff01;抬起我来吧&#xff01; 今天咱们来一起看看 Compose Accompanist 吧&#xff0…

创意项目管理软件推荐:满足客户需求的完美解决方案

发现功能强大的工作管理软件&#xff0c;让创意大放异彩。将您团队的愿景变成引人注目的项目。 一、交付总是令人印象深刻的工作 Zoho Projects的创意项目管理软件可帮助您和您的团队在一个地方监督多个项目。使用我们的内置管理工具和模板&#xff0c;花更少的时间在管理上&a…

普及100Hz高刷+1ms响应 微星发布27寸显示器:仅售799元

不论办公还是游戏&#xff0c;高刷及低响应时间都很重要&#xff0c;微星现在推出了一款27寸显示器PRO MP273A&#xff0c; 售价只有799元&#xff0c;但支持100Hz高刷、1ms响应时间&#xff0c;还有FreeSync技术减少撕裂。 PRO MP273A的100Hz高刷新率是其最大的卖点之一&#…

[每周一更]-(第57期):用Docker、Docker-compose部署一个完整的前后端go+vue分离项目

文章目录 1.参考项目2.技能点3.GO的Dockerfile配置后端的结构如图Dockerfile先手动docker调试服务是否可以启动报错 4.Vue的Dockerfile配置前端的结构如图nginx_docker.confDockerfile构建 5.docker-compose 整合前后端docker-compose.yml错误记录&#xff08;1&#xff09;ip端…

macOS下Django环境搭建-docker运行Django

1. macOS升级pip /Library/Developer/CommandLineTools/usr/bin/python3 -m pip install --upgrade pip 2. 卸载Python3.9.5版本 $ sudo rm -rf /usr/local/bin/python3 $ sudo rm -rf /usr/local/bin/pip3 $ sudo rm -rf /Library/Frameworks/Python.framework 3. 安装P…

Shiro是什么?为什么要用Shiro?

前言 本文小新为大家带来 Shiro入门概述 相关知识&#xff0c;具体内容包括Shiro是什么&#xff0c;为什么要用 Shiro&#xff0c;Shiro与Spring Security 的对比&#xff0c;Shiro的基本功能&#xff08;包括&#xff1a;基本功能框架&#xff0c;功能简介&#xff09;&#x…

【六袆 - 国际化】SpringBoot国际化Message

模拟场景校验请求参数 private void checkParam(List<ReqAppAdminDTO> req) {// 校验管理员如果已存在&#xff0c;则抛出已存在异常req.forEach(item -> {AppAdminDO appAdminDO appAdminMapper.selectByAppIdAndAdminNo(item.getAppId(), item.getAdminNo());if (O…

Jmeter命令行运行实例讲解

1. 简介 使用非 GUI 模式&#xff0c;即命令行模式运行 JMeter 测试脚本能够大大缩减所需要的系统资 本文介绍windows下以命令行模式运行的方法。 1.1. 命令介绍 jmeter -n -t <testplan filename> -l <listener filename> 示例&#xff1a; jmeter -n -t test…

跨境商城系统源码的优势,助力企业海外扩张

跨境电商发展背景与趋势 随着全球化的推进和互联网技术的快速发展&#xff0c;跨境电商已成为企业海外拓展的重要途径。然而&#xff0c;跨境电商面临着诸多挑战&#xff0c;如复杂的海外市场、文化差异、海关监管等。为了解决这些问题&#xff0c;企业可以借助跨境商城系统源码…

试图将更改推送到 GitHub,但是远程仓库已经包含了您本地没有的工作(可能是其他人提交的修改)

这通常是由于其他人或其他仓库推送到了相同的分支上&#xff0c;导致您的本地仓库和远程仓库之间存在冲突。 错误信息&#xff1a; To github.com:8upersaiyan/CKmuduo.git ! [rejected] main -> main (fetch first) error: failed to push some refs to github.com:8upers…

无涯教程-Perl - continue 语句函数

可以在 while 和 foreach 循环中使用continue语句。 continue - 语法 带有 while 循环的 continue 语句的语法如下- while(condition) {statement(s); } continue {statement(s); } 具有 foreach 循环的 continue 语句的语法如下- foreach $a (listA) {statement(s); } co…

SpringBoot集成百度人脸识别实现登陆注册功能Demo(二)

前言 上一篇SpringBoot集成百度人脸demo中我使用的是调用本机摄像头完成人脸注册&#xff0c;本次demo根据业务需求的不同我采用文件上传的方式实现人脸注册。 效果演示 首页 注册 后端响应数据&#xff1a; 登录 后端响应数据&#xff1a; 项目结构 后端代码实现 1、Bai…