1. 无锁场景
下面是一个扣减库存逻辑, 由于查库存和扣减库存两个操作不是原子的,明显存在并发超卖问题
// 假设初始库存200@GetMapping("/stock")public String stock(@RequestParam(value = "name", defaultValue = "World") String name) {String key = "product:101";Integer stock = Integer.valueOf(redisTemplate.opsForValue().get(key));if (stock > 0) {stock = stock - 1;redisTemplate.opsForValue().set(key, stock.toString());System.out.println("成功扣减库存, 还剩" + stock);} else {throw new RuntimeException("缺货");}return "200";}
压测结果: 1000人抢200库存商品, 卖出731件,存在超卖问题
2. 单机环境,加synchronized锁
private static Object STOCK_LOCK = new Object();// 假设初始库存200@GetMapping("/stock")public String stock(@RequestParam(value = "name", defaultValue = "World") String name) {String key = "product:101";synchronized (STOCK_LOCK) {Integer stock = Integer.valueOf(redisTemplate.opsForValue().get(key));if (stock > 0) {stock = stock - 1;redisTemplate.opsForValue().set(key, stock.toString());System.out.println("成功扣减库存, 还剩" + stock);return "200";}}throw new RuntimeException("缺货");}
压测结果:1000人抢200库存商品, 卖出200件,用例成功
3. 分布式环境,加synchronized锁
准备:这里启动两个节点, 用nginx负载均衡
压测结果:1000人抢200库存商品, 卖出310件,存在超卖问题
4. 分布式环境,redis setnx分布式锁
基础版
主要代码逻辑:
- 用setIfAbsent(setnx封装)加锁,同时设置超时时间,锁力度到具体商品
- 获取锁后执行减库存逻辑
- 执行成功释放锁
代码:
// 假设初始库存200@GetMapping("/stock2")public String stock2(@RequestParam(value = "name", defaultValue = "World") String name) {String key = "product:101";String lockKey = "lock:" + key;Boolean result = redisTemplate.opsForValue().setIfAbsent(lockKey, "1", 30, TimeUnit.SECONDS);if (result) {try {Integer stock = Integer.valueOf(redisTemplate.opsForValue().get(key));if (stock > 0) {stock = stock - 1;redisTemplate.opsForValue().set(key, stock.toString());System.out.println("成功扣减库存, 还剩" + stock);return "200";}} finally {redisTemplate.delete(lockKey);}}throw new RuntimeException("缺货");}
压测结果:1000人抢200库存商品, 卖出182件,剩余库存18件,业务正常
在低并发,服务器理想情况下, 业务正常,但是还存在一些问题
问题1
现在写死的锁过期时间30秒,但是在服务器压力大时, 接口耗时不稳定, 可能超过过期时间, 锁自动失效, 可能导致超卖
解决:锁续命, 开启一个后台线程, 如果业务没执行完,给锁延长过期时间.
问题2
A线程业务执行完, 准备释放锁时, 肯能刚好锁自动过期,这时候B线程进来抢占到锁正在执行业务,A线程开始删除锁, 此时其他线程都可能去拿到锁,保证不了同步
解决: 释放锁时,判断只有加锁线程才有资格去删除锁
@GetMapping("/stock3")public String stock3(@RequestParam(value = "name", defaultValue = "World") String name) {String key = "product:101";String lockKey = "lock:" + key;String clientId = UUID.randomUUID().toString();Boolean result = redisTemplate.opsForValue().setIfAbsent(lockKey, clientId, 30, TimeUnit.SECONDS);if (result) {try {Integer stock = Integer.valueOf(redisTemplate.opsForValue().get(key));if (stock > 0) {stock = stock - 1;redisTemplate.opsForValue().set(key, stock.toString());System.out.println("成功扣减库存, 还剩" + stock);return "200";}} finally {// 只能删除自己加的锁, 不让其他线程删if (clientId.equals(redisTemplate.opsForValue().get(lockKey))) {/* ... */redisTemplate.delete(lockKey);}}}throw new RuntimeException("缺货");}
问题3
但是问题2还没彻底解决, 因为比较clientId和删除锁这两个操作不是原子的, 如果中间卡顿,卡顿期间锁刚好自动过期,其他线程占有锁, 这里再执行删除锁就会误删别人锁.
解决: 可用lua脚本执行批量命令,保证原子性
Redisson分布式锁
Redisson是专门处理分布式场景使用Redis的组件, 里面就封装了锁续命,只删自己加的锁,lua脚本,锁重入等功能.
示例:
@Beanpublic Redisson redisson(RedisProperties redisProperties) {// 此为单机模式Config config = new Config();config.useClusterServers().setNodeAddresses(redisProperties.getCluster().getNodes().stream().map(node -> "redis://" + node).collect(Collectors.toList()));return (Redisson) Redisson.create(config);}@Autowiredprivate Redisson redisson;// 假设初始库存200@GetMapping("/stock4")public String stock4(@RequestParam(value = "name", defaultValue = "World") String name) {String key = "product:101";String lockKey = "lock:" + key;RLock rLock = redisson.getLock(lockKey);// 尝试加锁, 加锁失败会间歇阻塞再次加锁, 直至成功rLock.lock();try {Integer stock = Integer.valueOf(redisTemplate.opsForValue().get(key));if (stock > 0) {stock = stock - 1;redisTemplate.opsForValue().set(key, stock.toString());System.out.println("成功扣减库存, 还剩" + stock);return "200";}} finally {rLock.unlock();}throw new RuntimeException("缺货");}
压测结果:1000人抢200库存商品, 卖出200件,用例成功