聊一下Redis实现分布式锁的8大坑

前两篇文章都在讲 Redis 的 5 大常用数据类型,以及典型的 10 大应用场景。
那么今天就来看看 Redis 实现分布式锁。

聊一聊Redis实现分布式锁的8大坑

Redis中5大常见数据类型用法

工作中Redis用的最多的10种场景

在分布式系统中,保证资源的互斥访问是一个关键的点,而 Redis 作为高性能的键值存储系统,在分布式锁这块也被广泛的应用。然而,在使用 Redis 实现分布式锁时需要考虑很多的因素,以确保系统正确的使用还有程序的性能。

下面我们将探讨一下使用Redis实现分布式锁时需要注意的关键点。

首先还是大家都知道,使用 Redis 实现分布式锁,是两步操作,设置一个key,增加一个过期时间,所以我们首先需要保证的就是这两个操作是一个原子操作。

1、原子性

在获取锁和释放锁的过程中,要保证这个操作的原子性,确保加锁操作与设置过期时间操作是原子的。Redis 提供了原子操作的命令,如SETNX(SET if Not eXists)或者 SET 命令的带有NX(Not eXists)选项,可以用来确保锁的获取和释放是原子的。

String result = jedis.set(lockKey, requestId, "NX", "PX", expireTime);
if ("OK".equals(result)) {return true;
}
return false;

2、锁的过期时间

为了保证锁的释放,防止死锁的发生,获取到的锁需要设置一个过期时间,也就是说当锁的持有者因为出现异常情况未能正确的释放锁时,锁也会到达这个时间之后自动释放,避免对系统造成影响。


try{String result = jedis.set(lockKey, requestId, "NX", "PX", expireTime);if ("OK".equals(result)) {return true;}return false;
} finally {unlock(lockKey);
}  

此时有些朋友可能就会说,如果释放锁的过程中,发生系统异常或者网络断线问题,不也会造成锁的释放失败吗?

是的,这个极小概率的问题确实是存在的。所以我们设置锁的过期时间就是必须的。当发生异常无法主动释放锁的时候,就需要靠过期时间自动释放锁了。

不管操作成功与否,都要释放锁,不能忘了释放锁,可以说锁的过期时间就是对忘了释放锁的一个兜底。

3、锁的唯一标识

在上面对锁都加锁正常的情况下,在锁释放时,能正确的释放自己的锁吗,所以每个客户端应该提供一个唯一的标识符,确保在释放锁时能正确的释放自己的锁,而不是释放成为其他的锁。一般可以使用客户端的ID作为标识符,在释放锁时进行比较,确保只有当持有锁的客户端才能释放自己的锁。

如果我们加的锁没有加入唯一标识,在多线程环境下,可能就会出现释放了其他线程的锁的情况发生。

有些朋友可能就会说了,在多线程环境中,线程A加锁成功之后,线程B在线程A没有释放锁的前提下怎么可以再次获取到锁呢?所以也就没有释放其他线程的锁这个说法。

下面我们看这么一个场景,如果线程A执行任务需要10s,锁的时间是5s,也就是当锁的过期时间设置的过短,在任务还没执行成功的时候就释放了锁,此时,线程B就会加锁成功,等线程A执行任务执行完成之后,执行释放锁的操作,此时,就把线程B的锁给释放了,这不就出问题了吗。

所以,为了解决这个问题就是在锁上加入线程的ID或者唯一标识请求ID。
对于锁的过期时间短这个只能根据业务处理时间大概的计算一个时间,还有就是看门狗,进行锁的续期。

伪代码如下

if (jedis.get(lockKey).equals(requestId)) {jedis.del(lockKey);return true;
}
return false;

4、锁非阻塞获取

非阻塞获取意味着获取锁的操作不会阻塞当前线程或进程的执行。通常,在尝试获取锁时,如果锁已经被其他客户端持有,常见的做法是让当前线程或进程等待直到锁被释放。这种方式称为阻塞获取锁。

相比之下,非阻塞获取锁不会让当前线程或进程等待锁的释放,而是立即返回获取锁的结果。如果锁已经被其他客户端持有,那么获取锁的操作会失败,返回一个失败的结果或者一个空值,而不会阻塞当前线程或进程的执行。

非阻塞获取锁通常适用于一些对实时性要求较高、不希望阻塞的场景,比如轮询等待锁的释放。当获取锁失败时,可以立即执行一些其他操作或者进行重试,而不需要等待锁的释放。

在 Redis 中,可以使用 SETNX 命令尝试获取锁,如果返回成功(即返回1),表示获取锁成功;如果返回失败(即返回0),表示获取锁失败。通过这种方式,可以实现非阻塞获取锁的操作。

try {Long start = System.currentTimeMillis();while(true) {String result = jedis.set(lockKey, requestId, "NX", "PX", expireTime);if ("OK".equals(result)) {if(!exists(path)) {mkdir(path);}return true;}long time = System.currentTimeMillis() - start;if (time>=timeout) {return false;}try {Thread.sleep(50);} catch (InterruptedException e) {e.printStackTrace();}}
} finally{unlock(lockKey,requestId);
}  
return false;

在规定的时间范围内,假如说500ms,自旋不断获取锁,不断尝试加锁。

如果成功,则返回。如果失败,则休息50ms然后在开始重试获取锁。
如果到了超时时间,也就是500ms时,则直接返回失败。

说到了多次尝试加锁,在 Redis,分布式锁是互斥的,假如我们对某个 key 进行了加锁,如果 该key 对应的锁还没有释放的话,在使用相同的key去加锁,大概率是会失败的。

下面有这样一个场景,需要获取满足条件的菜单树,后台程序在代码中递归的去获取,知道获取到所有的满足条件的数据。我们要知道,菜单是可能随时都会变的,所以这个地方是可以加入分布式锁进行互斥的。

后台程序在递归获取菜单树的时候,第一层加锁成功,第二层、第n层 加锁不久加锁失败了吗?

递归中的加锁伪代码如下

private int expireTime = 1000;public void fun(int level,String lockKey,String requestId){try{String result = jedis.set(lockKey, requestId, "NX", "PX", expireTime);if ("OK".equals(result)) {if(level<=10){this.fun(++level,lockKey,requestId);} else {return;}}return;} finally {unlock(lockKey,requestId);}
}

如果我们直接使用的话,看起来问题不大,但是真正执行程序之后,就会发现报错啦。

因为从根节点开始,第一层递归加锁成功之后,还没有释放这个锁,就直接进入到了第二层的递归之中。因为锁名为lockKey,并且值为requestId的锁已经存在,所以第二层递归大概率会加锁失败,最后就是返回结果,只有底层递归的结果返回了。

所以,我们还需要一个可重入的特性。

5、可重入

redisson 框架中已经实现了可重入锁的功能,所以我们可以直接使用

private int expireTime = 1000;public void run(String lockKey) {RLock lock = redisson.getLock(lockKey);this.fun(lock,1);
}public void fun(RLock lock,int level){try{lock.lock(5, TimeUnit.SECONDS);if(level<=10){this.fun(lock,++level);} else {return;}} finally {lock.unlock();}
}

上述的代码仅供参考,这也只是提供一个思路。

下面我们还是聊一下 redisson 可重入锁的原理。

加锁主要通过以下代码实现的。

if (redis.call('exists', KEYS[1]) == 0) 
then  redis.call('hset', KEYS[1], ARGV[2], 1);        redis.call('pexpire', KEYS[1], ARGV[1]); return nil; 
end;
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) 
then  redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; 
end;
return redis.call('pttl', KEYS[1]);
  • KEYS[1]:锁名

  • ARGV[1]:过期时间

  • ARGV[2]:uuid + “:” + threadId,可认为是requestId

1、先判断如果加锁的key不存在,则加锁。

2、接下来判断如果keyrequestId值都存在,则使用hincrby命令给该keyrequestId值计数,每次都加1。注意一下,这里就是重入锁的关键,锁重入一次值就加1

3、如果当前 key 存在,但值不是 requestId ,则返回过期时间。

释放锁的脚本如下

if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) 
then return nil
end
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
if (counter > 0) 
then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; 
end; 
return nil

1、先判断如果 锁名keyrequestId 值不存在,则直接返回。

2、如果 锁名keyrequestId 值存在,则重入锁减1

3、如果减1后,重入锁的 value 值还大于0,说明还有引用,则重试设置过期时间。

4、如果减1后,重入锁的 value 值还等于0,则可以删除锁,然后发消息通知等待线程抢锁。

6、锁竞争

对于大量写入的业务场景,使用普通的分布式锁就可以实现我们的需求。但是对于写入操作少的,有大量读取操作的业务场景,直接使用普通的redis锁就会浪费性能了。所以对于锁的优化来说,我们就可以从业务场景,读写锁来区分锁的颗粒度,尽可能将锁的粒度变细,提升我们系统的性能。

6.1、读写锁

对于降低锁的粒度,上面我们知道了读写锁也算事在业务层面进行降低锁粒度的一种方式,所以下面我们以 redisson 框架为例,看看实现读写锁是如何实现的。

读锁

RReadWriteLock readWriteLock = redisson.getReadWriteLock("readWriteLock");
RLock rLock = readWriteLock.readLock();
try {rLock.lock();//业务操作
} catch (Exception e) {log.error(e);
} finally {rLock.unlock();
}

写锁

RReadWriteLock readWriteLock = redisson.getReadWriteLock("readWriteLock");
RLock rLock = readWriteLock.writeLock();
try {rLock.lock();//业务操作
} catch (InterruptedException e) {log.error(e);
} finally {rLock.unlock();
}

通过讲锁分为读锁与写锁,最大的提升之后就在与大大的提高系统的读性能,因为读锁与读锁之间是没有冲突的,不存在互斥,然后又因为业务系统中的读操作是远远多与写操作的,所以我们在提升了读锁的性能的同时,系统整体锁的性能都得到了提升。

读写锁特点

  • 读锁与读锁不互斥,可共享
  • 读锁与写锁互斥
  • 写锁与写锁互斥

6.2、分段锁

上面我们通过业务层面的读写锁进行了锁粒度的减小,下面我们在通过锁的分段减少锁粒度实现锁性能的提升。

如果你对 concurrentHashMap 的源码了解的话你就会知道分段锁的原理了。是的就是你想的那样,把一个大的锁划分为多个小的锁。

举个例子,假如我们在秒杀100个商品,那么常规做法就是一个锁,锁 100个商品,那么分段的意思就是,将100个商品分成10份,相当于有 10 个锁,每个锁锁定10个商品,这也就提升锁的性能提升了10倍。

具体的实现就是,在秒杀的过程中,对用户进行取模操作,算出来当前用户应该对哪一份商品进行秒杀。

通过上述将大锁拆分为小锁的过程,以前多个线程只能争抢一个锁,现在可以争抢10个锁,大大降低了冲突,提升系统吞吐量。

不过需要注意的就是,使用分段锁确实可以提升系统性能,但是相对应的就是编码难度的提升,并且还需要引入取模等算法,所以我们在实际业务中,也要综合考虑。

7、锁超时

在上面我们也说过了,因为业务执行时间太长,导致锁自动释放了,也就是说业务的执行时间远远大于锁的过期时间,这个时候 Redis 会自动释放该锁。

针对这种情况,我们可以使用锁的续期,增加一个定时任务,如果到了超时时间,业务还没有执行完成,就需要对锁进行一个续期。

Timer timer = new Timer(); 
timer.schedule(new TimerTask() {@Overridepublic void run(Timeout timeout) throws Exception {//自动续期逻辑}
}, 10000, TimeUnit.MILLISECONDS);

获取到锁之后,自动的开启一个定时任务,每隔 10s 中自动刷新一次过期时间。这种机制就是上面我们提到过的看门狗。

对于自动续期操作,我们还是推荐使用 lua 脚本来实现

if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]);return 1; 
end;
return 0;

需要注意的一点就是,锁的续期不是一直续期的,业务如果一直执行不完,到了一个总的超时时间,或者执行续期的次数超过几次,我们就不再进行续期操作了。

上面我们讲了这么几个点,下面我们来说一下 Redis 集群中的问题,如果发生网络分区,主从切换问题,那么该怎么解决呢?

8、网络分区

假设 Redis 初始还是主从,一主三从模式。

Redis 的加锁操作都是在 master 上操作,成功之后异步不同到 slave上。

master 宕机之后,我们就需要在三个slave中选举一个出来当作 master ,假如说我们选了slave1

现在有一个锁A进行加锁,正好加锁到 master上,然后 master 还没有同步到 slave 上,master 就宕机了,此时,后面在来新的线程获取锁A,也是可以加锁成功的,所以分布式锁也就失效了。

Redisson 框架为了解决这个问题,提供了一个专门的类,就是 RedissonRedLock,使用 RedLock 算法。

RedissonRedLock 解决问题的思路就是多搭建几个独立的 Redisson 集群,采用分布式投票算法,少数服从多数这种。假如有5Redisson 集群,只要当加锁成功的集群有5/2+1个节点加锁成功,意味着这次加锁就是成功的。

1、搭建几套相互独立的 Redis 环境,我们这里搭建5套。

2、每套环境都有一个 redisson node 节点。

3、多个 redisson node 节点组成 RedissonRedLock

4、环境包括单机、主从、哨兵、集群,可以一种或者多种混合都可以。

我们这个例子以主从为例来说

RedissonRedLock 加锁过程如下

1、向当前5Redisson node 节点加锁。

2、如果有3个节点加锁成功,那么整个 RedissonRedLock 就是加锁成功的。

3、如果小于3个节点加锁成功,那么整个加锁操作就是失败的。

4、如果中途各个节点加锁的总耗时,大于等于设置的最大等待时间,直接返回加锁失败。

通过上面这个示例可以发现,使用 RedissonRedLock 可以解决多个示例导致的锁失效的问题。但是带来的也是整个 Redis 集群的管理问题。

1、管理多套 Redis 环境

2、增加加锁的成本。有多少个 Redisson node就需要加锁多少次。

由此可见、在实际的高并发业务中,RedissonRedLock 的使用并不多。

在分布式系统中,CAP 理论应该都是知道的,所以我们在选择分布式锁的时候也可以参考这个。

  • C(Consistency) 一致性

  • A(Acailability) 可用性

  • P(Partition tolerance)分区容错性

所以如果我们的业务场景,更需要数据的一致性,我们可以使用 CP 的分布式锁,例子 zookeeper

如果我们更需要的是保证数据的可用性,那么我们可以使用 AP 的分布式锁,例如 Redis。

其实在我们绝大多数的业务场景中,使用Redis已经可以满足,因为数据的不一致,我们还可以使用 BASE 理论的最终一致性方案解决。因为如果系统不可用了,对用户来说体验肯定不是那么好的。

如果这篇文章对您有所帮助,或者有所启发的话,帮忙点个关注一下,您的支持是我坚持写作最大的动力。

求一键三连:点赞、转发、在看。

wx 搜索《醉鱼Java》,回复面试、获取2024面试资料

参考链接:https://mp.weixin.qq.com/s/EhucmYblfrRxbAuJTdPlfg

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/601341.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

高效测试丨怿星RTP协议测试解决方案

近几年&#xff0c;车内音视频娱乐系统不断发展&#xff0c;功能不断丰富&#xff0c;对于音视频的传输需求也逐渐增多&#xff0c;随着车载以太网的日渐成熟&#xff0c;各主机厂逐步方案落地、成本逐步降低&#xff0c;基于车载以太网的音视频传输也在逐步应用&#xff0c;常…

如何远程监控员工的电脑

如何远程监控员工的电脑 为什么要对员工使用电脑的行为进行监控呢&#xff1f;对企业来说是有其必要性的。其必要性主要体现以下4个方面。 1.数据安全与知识产权保护 防止数据泄露&#xff1a;企业内部往往存储着大量的敏感信息&#xff0c;如客户数据、财务资料、商业秘密、…

鸿蒙ArkUI开发实战:制作一个【简单计数器】

构建第一个页面 使用文本组件 工程同步完成后&#xff0c;在 Project 窗口&#xff0c;点击 entry > src > main > ets > pages &#xff0c;打开 Index.ets 文件&#xff0c;可以看到页面由 Row 、 Column 、 Text 组件组成。 index.ets 文件的示例如下&#xff1…

Qt+VS2019中使用QAxObject时的环境配置

在纯Qt中 在.pro中添加axcontainer模块即可 而VSqt中&#xff1a; 特别傻的是&#xff1a;我运行的是release&#xff0c;但配置的是debug的属性页&#xff0c;一直报错&#xff0c;人都傻了。 最后发现果然是人傻。

《2024工业软件行业软件授权方案使用情况调研报告》发布

深度解析行业应用与趋势 随着制造业的转型升级和数字化、智能化的加速推进&#xff0c;工业软件行业市场规模持续增长的同时也推动着行业技术和商业模式的创新。软件供应商需要根据市场需求和自身特点来制定合适的授权策略和商业模式&#xff0c;以实现软件产品的最大化商业价值…

算法练习第12天|● 239. 滑动窗口最大值● 347.前 K 个高频元素

239.滑动窗口的最大值 力扣原题 题目描述&#xff1a; 给你一个整数数组 nums&#xff0c;有一个大小为 k 的滑动窗口从数组的最左侧移动到数组的最右侧。你只可以看到在滑动窗口内的 k 个数字。滑动窗口每次只向右移动一位。 返回 滑动窗口中的最大值 。 示例 1&#xff…

Excel返回数值的各个位数值

运行结果 千位 如下图 INT(MOD(A2,10000)/1000) 百位 如下图 INT(MOD(A2,1000)/100) 十位 如下图 INT(MOD(A2,100)/10) 个位 如下图 MOD(A2,10) 特此记录 anlog 2024年4月8日

JavaScript - 请你说一说对随机数的理解

难度级别:初级及以上 提问概率:40% 在前端开发中,随机数的应用场景非常多,而且也是一个常见的考点。例如网页登录的验证码,看似只有4个随机数字加字母的组合,其实这也是随机数的范畴;例如在抽奖算法中,可以用随机数确定用户中奖的概率…

蓝桥杯每日一题:有序分数(递归)

给定一个整数 N&#xff0c;请你求出所有分母小于或等于 N&#xff0c;大小在 [0,1] 范围内的最简分数&#xff0c;并按从小到大顺序依次输出。 例如&#xff0c;当 N5 时&#xff0c;所有满足条件的分数按顺序依次为&#xff1a; 0/1,1/5,1/4,1/3,2/5,12/,35,2/3,3/4,4/5,1/…

类,构造,this,static

第1关&#xff1a;什么是类&#xff0c;如何创建类 100 任务要求参考答案 任务描述相关知识 什么是类怎么定义类创建对象并且使用对象的属性和方法编程要求测试说明 任务描述 本关任务&#xff1a;创建一个类和一个对象&#xff0c;调用这个对象的属性和方法。 相关知识 …

Linux虚拟主机中如何进行扫描检查恶意软件

看到论坛中有网友留言想要知道Linux虚拟主机上是否有扫描检测工具可以检测病毒文件或者恶意软件的。因为想要知道是否有此功能&#xff0c;以便他后去购买产品是可以更清晰的去咨询以及了解。正如这边是有使用Hostease提供商的Linux 虚拟主机&#xff0c;而cPanel中如何进行扫描…

使用nodejs搭建脚手架工具并发布到npm中

使用nodejs搭建脚手架工具并发布到npm中 一、安装环境依赖及脚手架搭建过程二、搭建Monorepo 风格的脚手架工程三、脚手架的必备模块命令参数模块获取命令参数设置子命令用户交互模块文件拷贝模块脚手架中的路径处理目录守卫文件拷贝模块动态文件生成模块mustache简介自动安装依…