目录
- 前言
- 课程内容
- 一、一个案例引发的思考
- 二、Redis分布式锁的演进
- 2.1 单纯使用Redis的setnx实现分布式锁
- 2.2 setnx + 过期时间
- 3.3 Redisson实现分布式锁:setnx + 过期时间 + 锁续命
- 三、Redisson客户端实现的分布式锁源码分析
- 4.1 RedissonLock#lock():加锁
- 4.2 RedissonLock#tryAcquire():尝试获取
- 学习总结
- 感谢
前言
Redis中间件,非常推荐大家学的一个东西。甚至这么说,Redis也许是我们Java程序员,能接触到的分布式、微服务中间件中一个较为高级,但又比较接地气的中间件了。为什么接地气?因为哪怕是在小项目中,Redis都是一个比较常用、可靠的中间件!
但是我发现,新手用Redis缓存很容易钻入一个牛角尖,那就是Redis会不会崩啊?万一哪一天断电了,宕机了怎么办呢?数据是不是就没了啊?最后得到一个结论:Redis不可靠啊!!! 但现实是,博主当前所在的小公司小项目Redis生产环境运行2年没蹦过。而且我们那点小体量,就算是崩了也无所谓,重启就行了(事实上,大公司大项目都会使用Redis集群解决这个问题)。但这话说的不严谨,其实关于Redis不可靠问题,正是我在前一篇文章说的【Redis主从架构】、【Redis集群】要解决的问题,人家Redis对大家的质疑早已经给出方案了。一句话:切勿讳疾忌医啊同学们。
课程内容
一、一个案例引发的思考
先假设,我们当前线上有一个项目,使用nginx
分别轮循到2个tomcat上。它的模型如下:
如上图,为了减缓节点压力,我们把项目部署成了2个tomcat,分别是8080端口和8081端口。并且采用的是轮询策略,客户端每次过来一条请求,将按序依次分流到这2个tomcat上。
然后,这个tomcat项目提供了如下这个接口:
@RequestMapping("/deduct_stock")public String deductStock1() {int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // jedis.get("stock")if (stock > 0) {int realStock = stock - 1;stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value)System.out.println("扣减成功,剩余库存:" + realStock);} else {System.out.println("扣减失败,库存不足");}return "end";}
案例分析:这个案例很简单,就是提供一个扣减库存的接口,模拟外部电商系统购买物品之后,扣减库存。但是,大家一定要注意到以下几个点:
- 无论我们部署多少个tomcat,库存肯定是被共享的。假设,我只有100个优惠产品,那肯定只能被卖出去100件
- 我i们把库存量放到了redis中去了,每卖出去一件,
stock - 1
,并且写回缓存
但事实上,上面这个代码是有问题的。不知道大家能不能理解到?这个对于有经验的朋友来说可能洒洒水而已,很简单。为了照顾萌新,我这边画个图吧。
上图是单个请求(线程)扣减库存的UML时序图(可能画的不标准,别在意,意思到了就好)。单个线程之下,如果请求都是串行的,也就是上一条执行完了,下一条继续进来请求扣减库存,那当然没问题。但是同学们啊,我们这里是多线程、多个tomcat的分布式环境,所以不出意外,你在生产环境会遇到下面这种情况:
我想,我上图已经画的很清楚了。当这里有另一个客户端请求进来的时候,并且请求顺序跟上面一样,情况显然就开始不对了,出现了【超卖】问题(两个客户端都扣减了一次库存,但是写回都是:99)。为了提点一下小白,上述的并发编程思想,我尽量再点一下:
- 一个http请求,代表着一个线程,所以,同一时刻不同的http请求,肯定是两条不同的线程
- 由于线程之间的隔离,每个线程都会保存一个变量副本,即:上图中客户端1跟客户端2,都会各自保存一个stock变量的副本,各自进行加减操作,然后再把结果99写回redis,这肯定是不对的。因为我们知道,这里本应该是98
好了,既然【超卖】问题已经出现了,那上面的问题怎么解决呢?下面,我们就好好研究一下,这个解决方式的演进。
二、Redis分布式锁的演进
一个很正常的思路,对于这种资源共享问题,多线程竞争问题,我想很多同学会想:那就加个锁呗。于是,有朋友提出了:synchronized
锁住代码块,嘿嘿,如下所示:
@RequestMapping("/deduct_stock")public String deductStock1() {synchronized (this) {int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // jedis.get("stock")if (stock > 0) {int realStock = stock - 1;stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value)System.out.println("扣减成功,剩余库存:" + realStock);} else {System.out.println("扣减失败,库存不足");}}return "end";}
咱先别讨论【锁粒度】问题,是不是真的有朋友想着用上面这种方式解决的呢?
这么说,上面这种方式在一个tomcat下,单进程的时候,是有效的,但是大伙忘了我们当前的环境,2个tomcat,分布式环境啊!你在tomcat1加synchronized
锁,我tomcat2是感知不到的!所以得换个方式。
有经验的朋友可能已经想到了,利用redis io跟命令处理是单线程的特性,所以可以使用setnx key value
实现分布式锁,没错,这就是我们现在要讲的东西。下面我们将开始演进,利用Redis实现分布式锁需要解决的问题。这边用的Redis客户端(工具)是StringRedisTemplate
,具体使用方式这边就不介绍了。
2.1 单纯使用Redis的setnx实现分布式锁
改正后的代码如下:
@RequestMapping("/deduct_stock")public String deductStock1() {String lockKey = "lock:product_101";// 使用redis的setnx命令Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "zhuge");if (!result) {return "error_code";}try {int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // jedis.get("stock")if (stock > 0) {int realStock = stock - 1;stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value)System.out.println("扣减成功,剩余库存:" + realStock);} else {System.out.println("扣减失败,库存不足");}} finally {stringRedisTemplate.delete(lockKey);}return "end";}
这边很简单,就是使用了stringRedisTemplate.opsForValue().setIfAbsent
,即:Redissetnx
命令。然后,如果Redis返回的result
不是true
,那就返回一个错误码,提示客户端【上锁失败】就好了。那同学们,这样就行了吗?我们画个图吧,嘿嘿嘿
就像上图这样,显然,从目前来看,是没问题,确实已经实现了,多个tomcat情况下,都能控制共享资源了。但是,万一,真的出现了客户端1在拿到锁之后,还没走到释放锁的代码就宕机了,那完了,资源没办法被释放!怎么办?难道我手动删除不成?这就是,单纯利用setnx
会遇到的第一个问题:死锁。
哈,我想到了这里,敏锐的同学发现了,既然有这种问题,那我给个过期时间不就行了吗?对啊,给过期时间啊,行得通!
2.2 setnx + 过期时间
对于这个方案,其实有2个版本,我先说第一个错误版本:
@RequestMapping("/deduct_stock")public String deductStock1() {String lockKey = "lock:product_101";Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "zhuge");stringRedisTemplate.expire(lockKey, 10, TimeUnit.SECONDS);if (!result) {return "error_code";}try {int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // jedis.get("stock")if (stock > 0) {int realStock = stock - 1;stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value)System.out.println("扣减成功,剩余库存:" + realStock);} else {System.out.println("扣减失败,库存不足");}} finally {stringRedisTemplate.delete(lockKey);}return "end";}
跟上述代码一样,我们在setIfAbsent
之后,加一个过期时间函数expire
。这个方案其实是不行的。很显然,目前这两步操作不是【原子性】的,Java代码嘛,肯定是一条一条按顺序执行的,就跟上面的例子一样,当我们出现极端情况,诶,还真就执行完setIfAbsent
之后,expire
之前宕机了呢?一样完犊子,会出现死锁,所以,正常我们是利用setIfAbsent
另一个重载方法,它会帮我们【原子】地操作这两步,如下:
@RequestMapping("/deduct_stock")public String deductStock1() {String lockKey = "lock:product_101";Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "zhuge", 30, TimeUnit.SECONDS);if (!result) {return "error_code";}try {int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // jedis.get("stock")if (stock > 0) {int realStock = stock - 1;stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value)System.out.println("扣减成功,剩余库存:" + realStock);} else {System.out.println("扣减失败,库存不足");}} finally {stringRedisTemplate.delete(lockKey);}return "end";}
好了,目前是原子性的了。并且呢,我们给这个锁,+了一个30秒的过期时间。可以了吗?啊,不完全对。大家能想明白吗?
很显然啊,你这个过期时间是固定30秒的,万一我业务30秒内完成不了呢?嘿,你是不是想说什么业务30秒完成不了,哈,真可能出现,比如IO阻塞了什么的。
那有朋友会继续建议:那我设置60秒?120秒?240秒?丢,我设置超长时间,总行了吧???咳咳咳,啊这个,有点道理的
可如果,我拿出:我不管,你锁多久我业务执行时间永远比你多1秒,阁下该如何应对呢?
哈哈,你是不是想说我无理取闹。好吧,我不抬杠了,我提出一个比较合理的问题哦:如果你设置的过期时间比较长假设5分钟,万一这时候真的宕机了呢?等过期时间到期吗?天啊,如果此时有数百万个请求进来,你是不是想让人等你5分钟自动解锁啊?秒杀时刻宕机5分钟我原谅你,商家能原谅你吗?
上述说问题的就是由于过期时间比较长,造成的整体【拒绝服务】问题,这是问题一。
其实时间设置过短,我想大家也能想得到会出现什么问题,可能业务没执行完就释放锁了,最后锁形同虚设,其他请求一样进来了,到时候又出现了跟最开始说的情景。这是问题二。
另外还想补充一点,时间设置过短其实还会出现一个很有意思的现象,这里我们画个图给大家看看:
由于客户端A在执行业务期间锁就过期了,此时,客户端B进来加锁肯定是能成功的。但是客户端A在没有出现错误的情况,肯定会继续执行下去的,并且最终会释放锁。那最终这个释放锁释放的是谁的锁呢?客户端B的呀!此时,又有一个新的客户端C过来加锁,那不是成功了吗?显然这样做是有问题的,【错误释放别人的锁】,并且自己的业务还不一定执行完了!
其实针对这个问题,还是有解决方案的。那就是每次上锁的时候,+一个uuid,最后释放锁的时候判断一下uuid是不是跟当前的uuid一样就好了。如下:
@RequestMapping("/deduct_stock")public String deductStock1() {String lockKey = "lock:product_101";String clientId = UUID.randomUUID().toString();Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, clientId, 30, TimeUnit.SECONDS);if (!result) {return "error_code";}try {int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // jedis.get("stock")if (stock > 0) {int realStock = stock - 1;stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value)System.out.println("扣减成功,剩余库存:" + realStock);} else {System.out.println("扣减失败,库存不足");}} finally {if (clientId.equals(stringRedisTemplate.opsForValue().get(lockKey))) {stringRedisTemplate.delete(lockKey);}}return "end";}
关键代码如上:最后finaly
块判断释放的时候,里面的value
值是不是当初我们设置的那个。但其实这仅仅只是解决了我们其中一个问题而已。还有个关键的【拒绝服务】问题呢。追根揭底,还是【锁时间】到底该如何确定的问题。
于是有人提出了一个方案:锁续命。顾名思义,就是设置一个相对不那么长的时间,但是临到期前或者某个时间点,重新设置过期时间。
3.3 Redisson实现分布式锁:setnx + 过期时间 + 锁续命
这里就要开始介绍,基于Redisson实现的一个分布式锁方案了。首先,要使用Reddison,需要先引入jar包,pom.xml
如下:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>2.9.0</version></dependency><dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.6.5</version></dependency>
代码示例:
@RequestMapping("/deduct_stock")
public String deductStock1() {String lockKey = "lock:product_001";//获取锁对象RLock redissonLock = redisson.getLock(lockKey);//加分布式锁redissonLock.lock(); // .setIfAbsent(lockKey, clientId, 30, TimeUnit.SECONDS);try {int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); if (stock > 0) {int realStock = stock - 1;stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value)System.out.println("扣减成功,剩余库存:" + realStock);} else {System.out.println("扣减失败,库存不足");}} finally {//解锁redissonLock.unlock();}return "end";}
上面的代码很简单,我们无需关心之前提到的那几个问题了,Redisson在封装的api里面已经帮我们做好了一切。我们只需要简单的调用lock
跟unlock
而已。
三、Redisson客户端实现的分布式锁源码分析
本次源码分析的入口,就是【3.3】中最后给出的代码示例redissonLock.lock()
。为了方便大家理解,这里我们给出这个源码实现的原理图:
整一块关键源码,其实主要涉及的是四个函数。附上摘抄自我风哥的源码流程图,大家可以跟着源码流程图看一下源码。整体来说比较简单:
4.1 RedissonLock#lock():加锁
我们深入上面的lock()
函数会发现,其实真正调用的是:lockInterruptibly()
,我们就只贴这个关键部分的源码了。如下:
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {// 线程idlong threadId = Thread.currentThread().getId();// 尝试获取锁,这部分是获取锁的关键方法Long ttl = this.tryAcquire(leaseTime, unit, threadId);// 不等于null说明获取锁失败if (ttl != null) {// redis订阅redisson_lock__channel + ":" + id + ":" + threadIdRFuture<RedissonLockEntry> future = this.subscribe(threadId);this.commandExecutor.syncSubscription(future);try {while(true) {// 再尝试获取一次锁ttl = this.tryAcquire(leaseTime, unit, threadId);if (ttl == null) {// 获取成功直接返回return;}// 大于等于0说明还没释放锁,通过semaphore阻塞时长为key的剩余有效时间if (ttl >= 0L) {this.getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} else {// 说明锁时间已到期可以尝试获取锁this.getEntry(threadId).getLatch().acquire();}}} finally {this.unsubscribe(future, threadId);}}}
源码解读:(涉及了不少锁的设计思想,用到了线程的一些知识,不懂的朋友可以看我前面的【并发专题】系列内容)
- 首先,代码刚进来会先调用一次
tryAcquire()
尝试获取锁,如果返回null则表示尝试获取锁成功(这里的代码也是一个关键点,我们会在下面讲解) - 获取不成功,则订阅该锁的解锁事件(使用了Redis内部的发布订阅功能,这个跟MQ差不多的东西)
- 这里也是一个关键点的开始。关键代码在一个
while(true)
里面执行的,所以叫做【自旋】。但是大家是不是以为,这样的操作会占满CPU呢??理论上是的,但是这里通过一些JUC的阻塞操作避免了这个问题 - 进入自旋,准备阻塞之前,依旧会尝试再次获取锁,为什么我要说【依旧】?如果大家看过我JUC的文章会知道,JUC很多锁在阻塞之前都会这么干,这是为了让线程进入阻塞之前,尽可能地,最好能拿到锁,因为阻塞线程会导致线程上下文切换,这是一个比较【重型】的操作
- 最最最关键的一部分代码,就是当加锁失败之后,这里有一个判断
ttl >= 0
的。如果>=0
成立,则调用getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS)
。我们点开就会发现,这里其实调用的是JUC中的Semaphore
信号量的tryAcquireSharedNanos
方法,会执行阻塞等待逻辑。线程阻塞会让出CPU,从而避免了前面提到的【自旋】占满CPU的问题 - 阻塞一段时间后重新唤醒,或者当锁被释放的时候也会被自动唤醒
4.2 RedissonLock#tryAcquire():尝试获取
源码如下:
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {return get(tryAcquireAsync(leaseTime, unit, threadId));}
然后进入tryAcquireAsync
逻辑,如下:
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {if (leaseTime != -1) {return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);}RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);ttlRemainingFuture.addListener(new FutureListener<Long>() {@Overridepublic void operationComplete(Future<Long> future) throws Exception {if (!future.isSuccess()) {return;}Long ttlRemaining = future.getNow();// lock acquiredif (ttlRemaining == null) {scheduleExpirationRenewal(threadId);}}});return ttlRemainingFuture;}
在这里,就出现了本次章节最最关键的2个核心方法了。
tryLockInnerAsync()
:尝试异步获取锁。如果大家还记得我们前面的推演的话,会知道,这里会调用setnx
+ 过期时间的操作(锁续命在下面)。关键代码如下:
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {internalLockLeaseTime = unit.toMillis(leaseTime);return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,"if (redis.call('exists', KEYS[1]) == 0) then " +"redis.call('hset', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"return redis.call('pttl', KEYS[1]);",Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));}
可以看到这里调用的正是lua
脚本,通过lua
脚本,保证了setnx
跟expire
的原子性。当然这里没有直接使用setnx
,而是使用exist + set
的方式。道理是一样的,不要太在意。我们这里主要是学习其设计思路跟思想
2. scheduleExpirationRenewal()
:锁续命。关键代码如下:
private void scheduleExpirationRenewal(final long threadId) {if (expirationRenewalMap.containsKey(getEntryName())) {return;}Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {@Overridepublic void run(Timeout timeout) throws Exception {RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return 1; " +"end; " +"return 0;",Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));future.addListener(new FutureListener<Boolean>() {@Overridepublic void operationComplete(Future<Boolean> future) throws Exception {expirationRenewalMap.remove(getEntryName());if (!future.isSuccess()) {log.error("Can't update lock " + getName() + " expiration", future.cause());return;}if (future.getNow()) {// reschedule itselfscheduleExpirationRenewal(threadId);}}});}}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {task.cancel();}}
这里做一个简单的源码解析:
- 首先这里向一个内部【延迟线程池】
commandExecutor
提交了一个任务,并且设定延迟时间为internalLockLeaseTime / 3
。所以我们才在最开始的原理图上说,这是一个异步操作 - 我们点击定义会发现
internalLockLeaseTime
会被初始化为30秒,30/3=10
,所以这个延迟线程池会在10秒后再次执行回调异步方法,也就是我们提交的task - 异步回调方法中,首先还是添加一个线程池任务,并且返回一个
Future
对象,并且给Future
对象添加一个监听器(这里先不讲监听器逻辑,下一点讲)。我们看里面这个线程池任务的业务逻辑,先调用一段lua
脚本来判断,当前线程是否还持有锁,如果还持有,返回1(true),否则返回0(false) - 再看监听器的逻辑,如果future返回的结果是
ture
,则递归调用自己scheduleExpirationRenewal
,再次新增一个异步检测任务。就这样,实现了循环检测的【锁续命】
学习总结
- 学习了Redis分布式锁实现的演进过程
- 学习了基于Redis实现分布式锁需要解决的一些问题,及解决思路
感谢
感谢我风哥,作者【高如风】的文章《redis分布式锁详解》