深入理解Zookeeper系列-2.Zookeeper基本使用和分布式锁原理

  • 👏作者简介:大家好,我是爱吃芝士的土豆倪,24届校招生Java选手,很高兴认识大家
  • 📕系列专栏:Spring源码、JUC源码、Kafka原理、分布式技术原理
  • 🔥如果感觉博主的文章还不错的话,请👍三连支持👍一下博主哦
  • 🍂博主正在努力完成2023计划中:源码溯源,一探究竟
  • 📝联系方式:nhs19990716,加我进群,大家一起学习,一起进步,一起对抗互联网寒冬👀

文章目录

  • 集群环境安装
  • Zookeeper java客户端的使用
    • Curator
    • 代码
    • 权限操作
      • 权限模式
  • 节点监听
  • 分布锁的实现

集群环境安装

在zookeeper集群中,各个节点总共有三种角色,分别是:leader,follower,observer

集群模式我们采用模拟3台机器来搭建zookeeper集群。分别复制安装包到三台机器上并解压,同时copy一份zoo.cfg。

  • 修改配置文件
  1. 修改端口
  2. server.1=IP1:2888:3888 【2888:访问zookeeper的端口;3888:重新选举leader的端口】
  3. server.2=IP2.2888:3888
  4. server.3=IP3.2888:2888
  • server.A=B:C:D:其 中
  1. A 是一个数字,表示这个是第几号服务器;
  2. B 是这个服务器的 ip地址;
  3. C 表示的是这个服务器与集群中的 Leader 服务器交换信息的端口;
  4. D 表示的是万一集群中的 Leader 服务器挂了,需要一个端口来重新进行选举,选出一个新
    的 Leader,而这个端口就是用来执行选举时服务器相互通信的端口。如果是伪集群的配置方
    式,由于 B 都是一样,所以不同的 Zookeeper 实例通信端口号不能一样,所以要给它们分配
    不同的端口号。
  5. 在集群模式下,集群中每台机器都需要感知到整个集群是由哪几台机器组成的,在配置文件
    中,按照格式server.id=host:port:port,每一行代表一个机器配置。id: 指的是server ID,用
    来标识该机器在集群中的机器序号
  • 新建datadir目录,设置myid

在每台zookeeper机器上,我们都需要在数据目录(dataDir)下创建一个myid文件,该文件只有一行内容,对应每台机器的Server ID数字;比如server.1的myid文件内容就是1。【必须确保每个服务器的myid文件中的数字不同,并且和自己所在机器的zoo.cfg中server.id的id值一致,id的范围是1~255】

  • 启动zookeeper

需要注意的是,如果使用云服务器搭建的话,需要开放端口。

Zookeeper java客户端的使用

针对zookeeper,比较常用的Java客户端有zkclient、curator。由于Curator对于zookeeper的抽象层次
比较高,简化了zookeeper客户端的开发量。使得curator逐步被广泛应用。

  1. 封装zookeeper client与zookeeper server之间的连接处理
  2. 提供了一套fluent风格的操作api
  3. 提供zookeeper各种应用场景(共享锁、leader选举)的抽象封装

Curator

<dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>4.2.0</version>
</dependency>
<dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>4.2.0</version>
</dependency>

代码

public static void main(String[] args) throws Exception {CuratorFramework curatorFramework=CuratorFrameworkFactory.builder().connectString("192.168.216.128:2181,192.168.216.129:2181,192.168.216.130:2181").sessionTimeoutMs(5000). // 会话超时,定时心跳机制retryPolicy(new ExponentialBackoffRetry(1000,3)).//重试connectionTimeoutMs(4000).build();curatorFramework.start(); //表示启动.
//创建
//        create(curatorFramework);
//修改
//        update(curatorFramework);
//查看
//        get(curatorFramework);operatorWithAsync(curatorFramework);create(curatorFramework);}private static String get(CuratorFramework curatorFramework) throws Exception {String rs=new String(curatorFramework.getData().forPath("/first_auth"));System.out.println(rs);return rs;}private static String create(CuratorFramework curatorFramework) throws Exception {String path=curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/first","Hello Gupaao".getBytes());System.out.println("创建成功的节点: "+path);return path;}private static String update(CuratorFramework curatorFramework) throws Exception {curatorFramework.setData().forPath("/first","Hello GuPaoEdu.cn".getBytes());return null;}//异步访问 | 同步(future.get())//redissonprivate static String operatorWithAsync(CuratorFramework curatorFramework) throws Exception {// 之前说过,数据同步的时候需要投票,如果我们可以使用异步的请求CountDownLatch countDownLatch = new CountDownLatch(1);curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).inBackground(new BackgroundCallback() {@Overridepublic void processResult(CuratorFramework client, CuratorEvent event) throws Exception {System.out.println(Thread.currentThread().getName()+":"+event.getResultCode());countDownLatch.countDown();}}).forPath("/second","second".getBytes());//TODO ...System.out.println("before");countDownLatch.await(); //阻塞System.out.println("after");return "";}测试 进入zookeeper
ls /
get first   就可以看到这个数据了

权限操作

我们可以设置当前节点增删改查的权限。

read
write(修改)
delete
create(创建)
admin
简写: rwdca

private static String authOperation(CuratorFramework curatorFramework) throws Exception {List<ACL> acls=new ArrayList<>();ACL acl=new ACL(ZooDefs.Perms.CREATE| ZooDefs.Perms.DELETE,new Id("digest", DigestAuthenticationProvider.generateDigest("u1:u1")));ACL acl1=new ACL(ZooDefs.Perms.ALL,new Id("digest", DigestAuthenticationProvider.generateDigest("u2:u2")));acls.add(acl);acls.add(acl1);curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).withACL(acls).forPath("/first_auth","123".getBytes());return null;}List<AuthInfo> list=new ArrayList<>();AuthInfo authInfo=new AuthInfo("digest","u2:u2".getBytes());list.add(authInfo);CuratorFramework curatorFramework=CuratorFrameworkFactory.builder().connectString("192.168.216.128:2181,192.168.216.129:2181,192.168.216.130:2181").sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000,3)).connectionTimeoutMs(4000).authorization(list).build();curatorFramework.start(); //表示启动.

权限模式

  • Ip 通过ip地址粒度来进行权限控制,例如配置 [ip:192.168.0.1], 或者按照网段 ip:192.168.0.1/24 ;
  • Digest:最常用的控制模式,类似于 username:password ;设置的时候需要
  • DigestAuthenticationProvider.generateDigest() SHA-加密和base64编码
  • World: 最开放的控制模式,这种权限控制几乎没有任何作用,数据的访问权限对所有用户开放。 world:anyone
  • Super: 超级用户,可以对节点做任何操作
  • auth 不需要id。不过这里应该用 expression 来表示。即(scheme:expression:perm)

节点监听

  • 当前节点的创建(NodeCreated)
  • 子节点的变更事件(NodeChildrenChanged) ->Dubbo
  • 当前被监听的节点的数据变更事件:NodeDataChanged
  • 当前节点被删除的时候会触发 NodeDeleted

ZooKeeper zooKeeper;public void originApiTest() throws IOException, KeeperException, InterruptedException {ZooKeeper zooKeeper=new ZooKeeper("192.168.216.128:2181", 5000, new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {//表示连接成功之后,会产生的回调时间}});Stat stat=new Stat();zooKeeper.getData("/first", new DataWatchListener(),stat); //针对当前节点/*  zooKeeper.exists();  //针对当前节点zooKeeper.getChildren();  //针对子节点的监听*/}class DataWatchListener implements Watcher{@Overridepublic void process(WatchedEvent watchedEvent) {// 事件回调String path=watchedEvent.getPath();// 再次注册监听try {zooKeeper.getData(path,this,new Stat());} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}}
private static void addNodeCacheListener(CuratorFramework curatorFramework,String path) throws Exception {NodeCache nodeCache=new NodeCache(curatorFramework,path,false);NodeCacheListener nodeCacheListener=new NodeCacheListener() {@Overridepublic void nodeChanged() throws Exception {System.out.println("Receive Node Changed");System.out.println(""+nodeCache.getCurrentData().getPath()+"->"+new String(nodeCache.getCurrentData().getData()));}};nodeCache.getListenable().addListener(nodeCacheListener);nodeCache.start();}private static void addPathChildCacheListener(CuratorFramework curatorFramework,String path) throws Exception {PathChildrenCache childrenCache=new PathChildrenCache(curatorFramework,path,true);PathChildrenCacheListener childrenCacheListener=new PathChildrenCacheListener() {@Overridepublic void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {System.out.println("子节点事件变更的回调");ChildData childData=pathChildrenCacheEvent.getData();System.out.println(childData.getPath()+"-"+new String(childData.getData()));}};childrenCache.getListenable().addListener(childrenCacheListener);childrenCache.start(PathChildrenCache.StartMode.NORMAL);}addNodeCacheListener(curatorFramework,"/first");addPathChildCacheListener(curatorFramework,"/first");需要在main方法中 不让其结束
System.in.read();

分布锁的实现

在这里插入图片描述

两个线程访问一个共享资源,就会造成数据的不确定性。所以需要加锁。

在这里插入图片描述

但是在分布式的场景下,线程变成进程

在这里插入图片描述

那么应该怎么做呢?如果使用Zookeeper来实现呢?

按照zookeeper的特性,只会有一个节点成功,其他的都是失败特性。如果处理完了,其他节点监听这个,当成功的那个节点删除了之后,回调通知再次获得锁即可。

在这里插入图片描述

但是会存在一个问题,比如说有100个节点,那么他就会触发99次来通知剩下的节点,为了解决这样的一个问题,一次性唤醒所有的话,我们可以使用顺序节点

在这里插入图片描述

先写入后,先排队

这样的话,我们每个节点只需要监听上一个顺序的变化即可,如果我们发现了一个节点删除了,然后去判断自己是不是序号最好的就ok,如果是最小的,那就发起获取锁的动作,如果不是就等着。

在这里插入图片描述

CuratorFramework curatorFramework=CuratorFrameworkFactory.builder().connectString("192.168.216.128:2181,192.168.216.129:2181,192.168.216.130:2181").sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000,3)).connectionTimeoutMs(4000).build();curatorFramework.start(); //表示启动./*** locks 表示命名空间* 锁的获取逻辑是放在zookeeper* 当前锁是跨进程可见*/InterProcessMutex lock=new InterProcessMutex(curatorFramework,"/locks");for(int i=0;i<10;i++){new Thread(()->{System.out.println(Thread.currentThread().getName()+"->尝试抢占锁");try {lock.acquire();//抢占锁,没有抢到,则阻塞System.out.println(Thread.currentThread().getName()+"->获取锁成功");} catch (Exception e) {e.printStackTrace();}try {Thread.sleep(4000);lock.release(); //释放锁System.out.println(Thread.currentThread().getName()+"->释放锁成功");} catch (InterruptedException e) {e.printStackTrace();} catch (Exception e) {e.printStackTrace();}},"t-"+i).start();}}

InterProcessMutex

private final ConcurrentMap<Thread, InterProcessMutex.LockData> threadData;// 首先看 acquire 方法
public void acquire() throws Exception {if (!this.internalLock(-1L, (TimeUnit)null)) {throw new IOException("Lost connection while trying to acquire lock: " + this.basePath);}}private boolean internalLock(long time, TimeUnit unit) throws Exception {// 获得当前线程Thread currentThread = Thread.currentThread();InterProcessMutex.LockData lockData = (InterProcessMutex.LockData)this.threadData.get(currentThread);if (lockData != null) {// 首先判断在同一个线程是否有重入的情况// 如果有重入,则 +1lockData.lockCount.incrementAndGet();return true;} else {// 如果没有重入String lockPath = this.internals.attemptLock(time, unit, this.getLockNodeBytes());if (lockPath != null) {// 说明注册成功InterProcessMutex.LockData newLockData = new InterProcessMutex.LockData(currentThread, lockPath);// 存进map中this.threadData.put(currentThread, newLockData);return true;} else {return false;}}}进入 attemptLockString attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception {long startMillis = System.currentTimeMillis();Long millisToWait = unit != null ? unit.toMillis(time) : null;byte[] localLockNodeBytes = this.revocable.get() != null ? new byte[0] : lockNodeBytes;int retryCount = 0;String ourPath = null;boolean hasTheLock = false;boolean isDone = false;// 这里面是一个死循环while(!isDone) {isDone = true;try {// try里面的逻辑,会在循环中会去创建一个锁ourPath = this.driver.createsTheLock(this.client, this.path, localLockNodeBytes);hasTheLock = this.internalLockLoop(startMillis, millisToWait, ourPath);} catch (NoNodeException var14) {// catch里面的逻辑实际上是重试逻辑if (!this.client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper())) {throw var14;}isDone = false;}}return hasTheLock ? ourPath : null;}进入createsTheLockpublic String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception {// 本质上就是创建一个临时有序节点String ourPath;if (lockNodeBytes != null) {ourPath = (String)((ACLBackgroundPathAndBytesable)client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL)).forPath(path, lockNodeBytes);} else {ourPath = (String)((ACLBackgroundPathAndBytesable)client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL)).forPath(path);}return ourPath;}// try里面的逻辑,会在循环中会去创建一个锁ourPath = this.driver.createsTheLock(this.client, this.path, localLockNodeBytes);
// 此时去判断拿没拿到锁,拿到了以后去判断是不是最小的hasTheLock = this.internalLockLoop(startMillis, millisToWait, ourPath);internalLockLoopprivate boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception {boolean haveTheLock = false;boolean doDelete = false;try {if (this.revocable.get() != null) {((BackgroundPathable)this.client.getData().usingWatcher(this.revocableWatcher)).forPath(ourPath);}while(this.client.getState() == CuratorFrameworkState.STARTED && !haveTheLock) { // while循环判断客户端的连接没有断开,并且没有获得锁的情况下// 拿到排序之后的节点List<String> children = this.getSortedChildren();String sequenceNodeName = ourPath.substring(this.basePath.length() + 1);// 去执行一个判断锁的逻辑PredicateResults predicateResults = this.driver.getsTheLock(this.client, children, sequenceNodeName, this.maxLeases);// 是否获得锁if (predicateResults.getsTheLock()) {haveTheLock = true;} else {// 否则进入监听的逻辑String previousSequencePath = this.basePath + "/" + predicateResults.getPathToWatch();synchronized(this) {try {((BackgroundPathable)this.client.getData().usingWatcher(this.watcher)).forPath(previousSequencePath);if (millisToWait == null) {// 在监听中告诉其等待this.wait(); } else {millisToWait = millisToWait - (System.currentTimeMillis() - startMillis);startMillis = System.currentTimeMillis();if (millisToWait > 0L) {this.wait(millisToWait);} else {doDelete = true;break;}}} catch (NoNodeException var19) {}}}}} catch (Exception var21) {ThreadUtils.checkInterrupted(var21);doDelete = true;throw var21;} finally {if (doDelete) {this.deleteOurPath(ourPath);}}return haveTheLock;}进入getsTheLockpublic PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception {// 得到索引,验证合法性int ourIndex = children.indexOf(sequenceNodeName);validateOurIndex(sequenceNodeName, ourIndex);// 判断是不是最小的,如果不是就取 -1之后的数boolean getsTheLock = ourIndex < maxLeases;String pathToWatch = getsTheLock ? null : (String)children.get(ourIndex - maxLeases);return new PredicateResults(pathToWatch, getsTheLock);// 首先,通过children.indexOf(sequenceNodeName)方法获取当前客户端创建的节点在子节点列表中的索引位置,并验证其合法性。然后,判断当前节点是否是最小的(即序号最小)。如果是最小的,则直接获取锁;否则,通过计算得到当前节点前面的一个节点名称,并将其设置为需要监听的节点路径,等待该节点释放锁后再尝试获取锁。}-----------------------------------------------释放
// 当收到这个节点发生变化以后
private final Watcher watcher = new Watcher() {public void process(WatchedEvent event) {LockInternals.this.client.postSafeNotify(LockInternals.this);}};
// 去唤醒当前的进程下处于阻塞的线程
default CompletableFuture<Void> postSafeNotify(Object monitorHolder) {return this.runSafe(() -> {synchronized(monitorHolder) {monitorHolder.notifyAll();}});}

比如说用户服务有个线程去监控,不可能是不断的轮询,没什么意义,那么发现没办法抢占就先阻塞,也就是抢占失败,当前一个节点被删除了之后,会有一个watcher通知,那么就会去唤醒,那么会再次调用这个逻辑,判断是不是最小的,如果是就抢占到了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/236045.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

nginx部署多个vue或react项目

下载nginx(tar.gz) nginx: download(官方地址) 部署nginx # 进入nginx压缩包所在目录 cd /usr/nginx# 解压 tar -zxvf nginx-1.25.3.tar.gz# 安装nginx的相关依赖 yum -y install gcc zlib zlib-devel pcre-devel openssl openssl-devel# 生成Makefile可编译文件 cd /usr/ng…

flutter开发实战-轮播Swiper更改Custom_layout样式中Widget层级

flutter开发实战-轮播Swiper更改Custom_layout样式中Widget层级 在之前的开发过程中&#xff0c;需要实现卡片轮播效果&#xff0c;但是卡片轮播需要中间大、两边小一些的效果&#xff0c;这里就使用到了Swiper。具体效果如视频所示 添加链接描述 这里需要的效果是中间大、两边…

No matching version found for @babel/compat-data@^7.23.5 处理

npm ERR! notarget No matching version found for babel/compat-data^7.23.5 处理 报错信息 npm WARN ERESOLVE overriding peer dependency npm ERR! code ETARGET npm ERR! notarget No matching version found for babel/compat-data^7.23.5. npm ERR! notarget In most …

变速箱壳体铸造件自动化三维测量室厂家自动化检测偏差比对-CASAIM-IS(2ND)

一、背景介绍&#xff1a; 随着制造业的快速发展&#xff0c;对产品质量和生产效率的要求不断提高。壳体铸造件作为一种常见的机械零部件&#xff0c;广泛应用于各个领域&#xff0c;对壳体铸造件的质量可靠性的要求也越来越高&#xff0c;因此&#xff0c;对壳体铸造件进行精…

单链表相关经典算法OJ题:移除链表元素

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 目录 前言 题目&#xff1a;移除链表元素 解法一&#xff1a; 解法一的代码实现&#xff1a; 解法二&#xff1a; 解法二代码的实现&#xff1a; 总结 前言 世上有两种耀眼的…

Python 进阶(十二):随机数(random 模块)

《Python入门核心技术》专栏总目录・点这里 文章目录 1. 导入random库2. 常用随机数函数2.1 生成随机浮点数2.2 生成随机整数2.3 从序列中随机选择2.4 随机打乱序列3. 设置随机数种子4. 应用实例4.1 游戏开发4.2 数据分析4.3 加密与安全4.4 模拟实验5. 总结大家好,我是水滴~~ …

长沙电信大楼火灾调查报告发布:系烟头引发。FIS来护航安全

近日&#xff0c;长沙电信大楼的火灾调查报告引起广泛关注。调查发现&#xff0c;火灾是由未熄灭的烟头引发&#xff0c;烟头点燃了室外平台的易燃物&#xff0c;迅速蔓延至整个建筑。这起悲剧再次提醒我们&#xff0c;小小的疏忽可能酿成大灾难。但如果我们能及时发现并处理这…

Git常用命令#merge分支合并

要查看所有分支&#xff0c;包括本地和远程仓库的分支&#xff0c;可以使用以下命令&#xff1a; 1.查看分支 1.1 查看本地分支 git branch这个命令会列出本地所有的分支&#xff0c;当前所在的分支会有 * 标记。 1.2 查看远程分支 git branch -r这个命令会列出远程仓库的分…

XUbuntu22.04之安装OBS30.0强大录屏工具(一百九十五)

简介&#xff1a; CSDN博客专家&#xff0c;专注Android/Linux系统&#xff0c;分享多mic语音方案、音视频、编解码等技术&#xff0c;与大家一起成长&#xff01; 优质专栏&#xff1a;Audio工程师进阶系列【原创干货持续更新中……】&#x1f680; 优质专栏&#xff1a;多媒…

邮政快递物流查询,将指定某天签收的单号筛选出来

批量查询邮政快递单号的物流信息&#xff0c;将指定某天签收的单号筛选出来。 所需工具&#xff1a; 一个【快递批量查询高手】软件 邮政快递单号若干 操作步骤&#xff1a; 步骤1&#xff1a;运行【快递批量查询高手】软件&#xff0c;并登录 步骤2&#xff1a;点击主界面左…

HarmonyOS4.0 ArkUI组件

目录 简介 搭建开发环境 ArkUI基础组件 Image组件 Text组件 TextInput Button Slider 简介 HarmonyOS 4.0的ArkUI组件是一套UI开发框架&#xff0c;提供开发者进行应用UI开发时所必须的能力。在ArkUI中&#xff0c;组件是界面搭建与显示的最小单位&#xff0c;开发者通过…

Redis7--基础篇5(管道、发布订阅)

管道是什么 管道(pipeline)可以一次性发送多条命令给服务端&#xff0c;服务端依次处理完完毕后&#xff0c;通过一条响应一次性将结果返回&#xff0c;通过减少客户端与redis的通信次数来实现降低往返延时时间。pipeline实现的原理是队列&#xff0c;先进先出特性就保证数据的…