【大数据】Zookeeper 数据写入与分布式锁

Zookeeper 数据写入与分布式锁

  • 1.数据是怎么写入的
  • 2.基于 Zookeeper 实现分布式锁

1.数据是怎么写入的

无论是 Zookeeper 自带的客户端 zkCli.sh,还是使用 Python(或者其它语言)实现的客户端,本质上都是连接至集群,然后往里面读写数据。那么问题来了,集群在收到来自客户端的写请求时,是怎么写入数据的呢?

另外客户端在访问集群的时候,本质上是访问集群内的某一个节点,而根据访问的节点是领导者还是追随者,写入数据的过程也会有所不同。

先来看看当 访问的节点是领导者 的情况:

在这里插入图片描述
这里面有一个关键的地方,就是 Leader 不会等到所有的 Follower 都写完,只要有一半的 Follower 写完,就会告知客户端。还是半数机制,一半的 Follower 加上 Leader 正好刚过半数。而这么做的原因也很简单,就是为了快速响应。

再来看另一种情况,如果客户端 访问的节点是追随者,情况会怎么样呢?其实很简单,由于追随者没有写权限,那么会先将写请求转发给领导者,然后接下来的步骤和上面类似,只是最后一步不同。

当 Leader 发现有半数的 Follower 写完,就认为写数据成功,于是返回 ack。但这个 ack 不会返回给客户端,因为客户端访问的不是领导者,最终领导者会将 ack 返回给客户端访问的追随者,再由这个追随者将 ack 返回给客户端,告知写请求已执行完毕。

2.基于 Zookeeper 实现分布式锁

关于分布式锁,我之前介绍过如何基于 Redis 实现分布式锁,里面对分布式锁做了比较详细的解析。下面来聊一聊如何基于 Zookeeper 实现分布式锁。

先来说一下原理,当客户端需要操作共享资源时,需要先往 Zookeeper 集群中创建一个临时顺序节点。然后查看对应的编号,如果没有比它小的,说明最先创建,我们就认为客户端拿到了分布式锁。

如果客户端发现节点的编号不是最小的,说明已经有人先创建了,也就是锁已经被别的客户端拿走了。那么该客户端会对前一个节点进行监听,等待释放。

在这里插入图片描述

所以从概念上还是很好理解的,然后我们来编程实现一下。

from typing import List
import queue
from kazoo.client import KazooClientclass DistributedLock:def __init__(self, hosts: List[str]):""":param hosts: 'ip1:port1,...'"""self.client = KazooClient(",".join(hosts))self.client.start()# 要在 /lock 节点下面创建临时顺序节点# 所以先保证 /lock 节点存在if not self.client.exists("/lock"):self.client.create("/lock")# 要创建的临时顺序节点self.cur_node = None# 要监听的节点(也就是上一个节点)self.prev_node = None# 本地队列self.q = queue.Queue()def acquire(self):"""获取锁:return:"""self.cur_node = self.client.create("/lock/seq-",# 临时顺序节点ephemeral=True,sequence=True)# create 方法会返回创建的节点名称# 需要判断编号是不是最小的# 因此要拿到所有的节点nodes = self.client.get_children("/lock")# nodes: ["seq-000..0", "seq-000...1"]nodes.sort()if len(nodes) == 1:return Trueelif "/lock/" + nodes[0] == self.cur_node:# 如果 nodes 里面的最小值和 node 相等# 说明该客户端创建的节点的编号最小# 于是我们就认为它拿到了分布式锁return True# 否则说明不是最小,因此要找到它的上一个节点# 也就是要监听的节点index = nodes.index(self.cur_node.split("/")[-1])self.prev_node = "/lock/" + nodes[index - 1]# 对上一个节点进行监听self.client.get(self.prev_node, watch=self.watch)# 这一步不是阻塞的,但程序必须要拿到锁之后才可以执行# 所以我们要显式地让程序阻塞在这里self.q.get()return Truedef release(self):"""释放锁:return:"""self.client.delete(self.cur_node)def watch(self, event):"""监听函数,参数 event 是一个 namedtuplekazoo.protocol.states.WatchedEvent里面有三个字段:type、state、path监听节点的值被改变时,type 为 "CHANGED"监听节点被删除时,type 为 "DELETED"path 就是监听的节点本身state 表示客户端和服务端之间的连接状态建立连接时,状态为 LOST连接建立成功,状态为 CONNECTED如果在整个会话的生命周期里,伴随着网络闪断、服务端异常或者其他什么原因导致客户端和服务端连接断开,状态为 SUSPENDED与此同时,KazooClient 会不断尝试与服务端建立连接,直至超时如果连接建立成功了,那么状态会再次切换到 CONNECTED"""if event.type == "DELETED" and \self.prev_node == event.path:# 往队列里面扔一个元素# 让下一个节点解除阻塞self.q.put(None)# 测试函数
def test(lock, name):lock.acquire()print(f"{name}获得锁,其它人等着吧")print(f"{name}处理业务······")print(f"{name}处理完毕,释放锁")lock.release()if __name__ == '__main__':import threadinghosts = ["82.157.146.194:2181",  "121.37.165.252:2181",  "123.60.7.226:2181",    ]# 创建三把锁lock1 = DistributedLock(hosts)lock2 = DistributedLock(hosts)lock3 = DistributedLock(hosts)threading.Thread(target=test, args=(lock1, "客户端1")).start()threading.Thread(target=test, args=(lock2, "客户端2")).start()threading.Thread(target=test, args=(lock3, "客户端3")).start()"""
客户端1获得锁,其它人等着吧
客户端1处理业务······
客户端1处理完毕,释放锁
客户端3获得锁,其它人等着吧
客户端3处理业务······
客户端3处理完毕,释放锁
客户端2获得锁,其它人等着吧
客户端2处理业务······
客户端2处理完毕,释放锁
"""

实现起来不是很难,并且使用 Zookeeper 的好处就是,我们不需要担心死锁的问题。因为客户端宕掉之后,临时节点会自动删除,但缺点是性能没有 Redis 高。

另外值得一提的是,kazoo 已经帮我们实现好了分布式锁,开箱即用,我们就不需要再手动实现了。

# 创建客户端
client = KazooClient(",".join(hosts))
client.start()
# 此时需要自己手动给一个唯一标识
lock = client.Lock("/lock", "unique-identifier")
# 获取锁
lock.acquire()
# 处理业务逻辑
...
# 释放锁
lock.release()
# 或者也可以使用上下文管理器
with lock:...

显然就优雅多了,借助于 kazoo 实现好的分布式锁,可以减轻我们的心智负担。此外 kazoo 还提供了 读锁写锁

  • client.ReadLock
  • client.WriteLock

我们一般使用 client.Lock 就行,可以自己测试一下。


关于 Zookeeper 的基础内容就介绍到这里,但伴随着 Zookeeper 还有一系列的协议,比如 Paxos 协议ZAB 协议CAP 定理 等等,这些可谓是分布式系统的重中之重。我们后续来逐一介绍。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/327801.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

记事本在手机桌面上怎么找?手机里的记事本怎么找?

在日常生活、工作和学习中,我们时常需要随手记录一些重要的事项、灵感闪现的瞬间或者是待办的任务。比如,在超市购物前,列出购物清单;在开会时,记下重要的讨论点;在学习时,捕捉那一刹那的灵感。…

C语言-第十八周做题总结-数组3

id:454 A.字符串逆序 题目描述 输入一个字符串,对该字符串进行逆序,输出逆序后的字符串。 输入 输入在一行中给出一个不超过80个字符长度的、以回车结束的非空字符串。 输出 在一行中输出逆序后的字符串。 输入样例 输出样例 题解 先用一个while…

【大数据进阶第三阶段之Hive学习笔记】Hive基础入门

目录 1、什么是Hive 2、Hive的优缺点 2.1、 优点 2.2、 缺点 2.2.1、Hive的HQL表达能力有限 2.2.2、Hive的效率比较低 3、Hive架构原理 3.1、用户接口:Client 3.2、元数据:Metastore 3.3、Hadoop 3.4、驱动器:Driver Hive运行机制…

2.3_6 用信号量实现进程互斥、同步、前驱关系

2.3_6 用信号量实现进程互斥、同步、前驱关系 #mermaid-svg-fj0wp6tJGfadcT8h {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-fj0wp6tJGfadcT8h .error-icon{fill:#552222;}#mermaid-svg-fj0wp6tJGfadcT8h .error-t…

在Ubuntu22.04上离线部署Tailchat(一)

一:Tailchat介绍 Tailchat 是一款插件化易拓展的开源 IM 应用。可拓展架构赋予 Tailchat 无限可能性。前端微内核架构 后端微服务架构 使得 Tailchat 能够驾驭任何定制化/私有化的场景,是面向企业与私域用户打造,高度自由的群组管理与定制化…

第11章 GUI Page462~476 步骤二十三,二十四,二十五 Undo/Redo ③实现“Undo/Redo”菜单项

工程六 添加“编辑”菜单和子菜单 菜单ID分别为 idMenuEditUndo 和 idMenuEditRedo 热键(快捷键)分别为CtrlZ 和 CtrlShiftZ 变量名分别为 MenuItemEditUndo 和 MenuItemEditRedo 分别添加事件 ActionLink类增加成员函数 运行效果:“添加…

C语言注意点(2)

1.使用pow函数的相关问题 局部变量n0 while(num/pow(10,n)) n; 为什么不可行 printf("%d",num/pow(10,4)%10) 为什么要提前用temp先引出来 答:pow函数的返回值为double类型,1.终止条件不会满足 2.num/pow(10,4)结果为浮点型,浮…

运维工程师的出路

运维工程师的出路到底在哪里? 你是不是也常常听到身边的运维人员抱怨,他们的出路到底在哪里呢?别着急,让我告诉你,运维人员就像是IT界的“万金油”,他们像“修理工”一样维修服务器,像“消防员…

基于日照时数计算逐日太阳辐射

基于日照时数计算逐日太阳辐射

配置cendos 安装docker 配置阿里云国内加速

由于我安装的cendos是镜像版。已经被配置好了。所以只需要更新相关配置信息即可。 输入 yum update自动更新所有配置 更新完成后输入 yum list docker-ce --showduplicates | sort -r 自动查询所有可用的docker版本 输入 yum install docker-ce docker-ce-cli container…

STM32-03-STM32HAL库

文章目录 STM32HAL库1. HAL库介绍2. STM32Cube固件包3. HAL库框架结构4. 新建HAL版本MDK工程 STM32HAL库 1. HAL库介绍 HAL库 HAL,英文全称 Hardware Abstraction Layer,即硬件抽象层。HAL库是ST公司提供的外设驱动代码的驱动库,用户只需要调…

LiveGBS流媒体平台GB/T28181功能-用户管理通道权限管理关联通道支持只看已选只看未选添加用户备注角色

LiveGBS功能用户管理通道权限管理关联通道支持只看已选只看未选添加用户备注角色 1、用户管理2、添加用户3、关联通道3.1、只看已选3.2、只看未选 4、自定义角色5、搭建GB28181视频直播平台 1、用户管理 LiveGBS支持用户管理,添加用户,及配置相关用户权…