文章目录
- 优惠券秒杀
- 全局ID生成器
- 优惠券秒杀下单
- 超卖问题
- 一人一单
- 分布式锁
- 基于Redis的setnx指令实现分布式锁
- 解决锁误删问题
- 基于Lua脚本实现多条指令原子性
- Redis调用Lua脚本
- Java中使用Lua脚本
- Redisson
- Redisson快速入门
- Redisson可重入锁原理
- Redisson的锁重试和Watchdog机制
- Redisson的multilock
- 秒杀优化
- Redis缓存解耦
- Redis消息队列
- 基于List结构
- 基于PubSub的消息队列
- 基于Stream的消息队列
- 基于Stream的消息队列 - 消费者组
优惠券秒杀
全局ID生成器
- 第一位为符号位,永远为0
- 2-32位为时间戳差值,指定从某一个时刻开始,计算当前的时间戳与起始时间戳的差值,保证了id的自增性,但不一定是连续的。
- 后32位,可以采用分区+序列号的方式。(分布式)
本质上跟mybatis-plus的雪花算法是一样的。
public class RedisIdWorker {/*** 开始时间戳*/private static final long BEGIN_TIMESTAMP = 1640995200L;/*** 序列号的位数*/private static final int COUNT_BITS = 32;private final StringRedisTemplate stringRedisTemplate;public RedisIdWorker(StringRedisTemplate stringRedisTemplate) {this.stringRedisTemplate = stringRedisTemplate;}public long nextId(String keyPrefix) {// 1.生成时间戳LocalDateTime now = LocalDateTime.now();long nowSecond = now.toEpochSecond(ZoneOffset.UTC);long timestamp = nowSecond - BEGIN_TIMESTAMP;// 2.生成序列号// 2.1.获取当前日期,精确到天String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));// 2.2.自增长// 此处的警告可以忽略,因为如果key不存在,会从0开始增长。// 这里key前缀以日期构建的目的是为了避免自增超出序列号范围,同一天下单数量一定不会超出32位long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);// 3.拼接并返回// timestamp 左移 32位,变到高位,那么低32位为0,或上count即可,count一定不会超出32位范围。return timestamp << COUNT_BITS | count;}
}
测试:
编写一个runnable 的任务task,循环100次,执行自增id测试。
构建一个固定工作线程数为300的线程池,循环将线程池中提交task。那么最终相当于是自增id 3万(100 * 300)次
使用CountDownLatch来帮助计时,因为我们用到了线程池,线程池的执行是异步的,因此简单使用end - begin,当执行到end时,可能还有未执行完毕的异步线程。
而使用CountDownLatch,则可以帮助我们标记异步线程,latch.await();会等待所有异步线程执行完毕。
@Resource
private RedisIdWorker redisIdWorker;private final ExecutorService es = Executors.newFixedThreadPool(500);@Test
public void testIdWorker() throws InterruptedException {CountDownLatch latch = new CountDownLatch(300);Runnable task = () -> {for (int i = 0; i < 100; ++i) {long id = redisIdWorker.nextId("order");System.out.println("id:" + id);}latch.countDown();};long begin = System.currentTimeMillis();for (int i = 0; i < 300; ++i) {es.submit(task);}latch.await();long end = System.currentTimeMillis();System.out.println("time cost:" + (end - begin));
}
优惠券秒杀下单
超卖问题
一个线程查询有库存,尚未扣除库存,另外一个线程也执行了库存查询,由于此外前面的线程还没来得及扣除库存,因此后来的线程也可以执行下单。
使用乐观锁解决超卖问题,仅当数据库更新操作时,剩余库存依旧大于0,才执行成功。
public class IVoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {@Resourceprivate ISeckillVoucherService seckillVoucherService;@ResourceRedisIdWorker redisIdWorker;// 因为设计两张表操作,使用事务保证操作连续性@Transactional@Overridepublic Result secKillVoucher(Long voucherId) {// 1. 查询优惠券SeckillVoucher voucher = seckillVoucherService.getById(voucherId);// 2. 判断秒杀是否开始if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {return Result.fail("秒杀尚未开始!");}// 3. 判断秒杀是否结束if (voucher.getEndTime().isBefore(LocalDateTime.now())) {return Result.fail("秒杀已经结束!");}// 4. 判断库存是否充足if (voucher.getStock() < 1) {// 库存不足return Result.fail("库存不足!");}// 5. 扣减库存// 使用乐观锁解决超卖问题,仅当数据库更新操作时,剩余库存依旧大于0,才执行成功。boolean success = seckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherId).gt("stock", 0).update();if (!success) {return Result.fail("库存不足!");}// 6. 创建订单VoucherOrder voucherOrder = new VoucherOrder();// 6.1 创建订单id,使用全局生成器long orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);// 6.2 用户idLong userId = UserHolder.getUser().getId();voucherOrder.setUserId(userId);// 6.3 代金券idvoucherOrder.setVoucherId(voucherId);save(voucherOrder);// 7. 返回订单idreturn Result.ok(orderId);}
}
一人一单
需求:修改优惠券秒杀业务,同一个用户只能下一单。
主要问题:
1、为了保证一人一张,需要根据用户id和和优惠券id查询是否已经下单过,该过程需要上锁避免线程安全问题。
2、锁对象可以是用户id的字符串形式,保存在常量池中。
3、锁的范围应该在事务提交之后,因此最好将整个方法上锁。
4、掉用本类中的方法,可能导致事务失效,解决方案是使用代理对象中的方法。
(1)添加依赖:
<dependency><groupId>org.aspectj</groupId><artifactId>aspectjweaver</artifactId>
</dependency>
(2)启动类中暴露代理对象给spring容器:
(3)使用容器中的代理执行方法。
synchronized (userId.toString().intern()) {// 防止事务失效IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();return proxy.createVoucherOrder(voucherId);}
完整逻辑如下:
public class IVoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {@Resourceprivate ISeckillVoucherService seckillVoucherService;@ResourceRedisIdWorker redisIdWorker;@Overridepublic Result secKillVoucher(Long voucherId) {// 1. 查询优惠券SeckillVoucher voucher = seckillVoucherService.getById(voucherId);// 2. 判断秒杀是否开始if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {return Result.fail("秒杀尚未开始!");}// 3. 判断秒杀是否结束if (voucher.getEndTime().isBefore(LocalDateTime.now())) {return Result.fail("秒杀已经结束!");}// 4. 判断库存是否充足if (voucher.getStock() < 1) {// 库存不足return Result.fail("库存不足!");}//用户idLong userId = UserHolder.getUser().getId();// 使用.intern(),使得从字符串常量池中取得唯一的id对象,作为锁对象synchronized (userId.toString().intern()) {// 防止事务失效IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();return proxy.createVoucherOrder(voucherId);}}// 因为设计两张表操作,使用事务保证操作连续性@Transactionalpublic Result createVoucherOrder(Long voucherId) {// 5 一人一单Integer count = query().eq("user_id", UserHolder.getUser().getId()).eq("voucher_id", voucherId).count();if (count > 0) {// 用户已经秒杀过优惠券return Result.fail("用户已经购买过一次!");}// 6. 扣减库存// 使用乐观锁解决超卖问题,仅当数据库更新操作时,剩余库存依旧大于0,才执行成功。boolean success = seckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherId).gt("stock", 0).update();if (!success) {return Result.fail("库存不足!");}// 7. 创建订单VoucherOrder voucherOrder = new VoucherOrder();// 7.1 创建订单id,使用全局生成器long orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);// 7.2 用户idLong userId = UserHolder.getUser().getId();voucherOrder.setUserId(userId);// 7.3 代金券idvoucherOrder.setVoucherId(voucherId);save(voucherOrder);// 8. 返回订单idreturn Result.ok(orderId);}
}
上述方案在集群模式下依旧会有问题,因为锁对象是字符串常量池中的用户id,集群模式下,不同的服务器会有不同的JVM,因此锁对象就不唯一了。
解决方案就是使用分布式锁。
分布式锁
分布式锁:满足分布式系统或集群模型下,多进程可见并且互斥的锁。
常见的分布式锁实现方案有三种:
- 基于MySQL本身的互斥锁机制
- 基于Redis的setnx这样的互斥命令
- 基于Zookeeper利用节点的唯一性和有序性
基于Redis的setnx指令实现分布式锁
假定服务器集群共用一个第三方的Redis,那么就可以在Redis上,使用一个lock
为key,threadid
为值的键值对来表示锁对象。
模拟获取锁:
- 保证互斥,确保只能有一个线程获取到锁
- 非阻塞:尝试一次,成功返回true,失败返回false
- 为了避免释放锁的操作失败,导致后序永远无法获取到锁,应该为锁设置有效期,逾期自动释放。
Redis命令:
set lock threadId nx ex 10
模拟释放锁:
直接删除 lock
即可
del lock
在Java中实现如下,注意点为:
- 准备Redis操作需要的
StringRedisTemplate
,为了不同的业务使用不同的锁,应该在锁对象的key上加上业务名称name
,这两个变量通过构造函数传入。 - 模拟获取锁函数
tryLock()
,返回布尔值,代表是否成功获取到锁,可指定锁的TTL。.setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);
,指定锁的键值对,key为lock前缀 + 业务名,值为线程id。 - 模拟释放锁
unlock()
,直接根据key删除代表锁的键值对。
public class SimpleRedisLock implements ILock {private String name;private StringRedisTemplate stringRedisTemplate;public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {this.name = name;this.stringRedisTemplate = stringRedisTemplate;}private static final String KEY_PREFIX = "lock:";@Overridepublic boolean tryLock(long timeoutSec) {// 获取线程标示String threadId = Thread.currentThread().getId();// 获取锁Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);return Boolean.TRUE.equals(success);}@Overridepublic void unlock() {// 释放锁stringRedisTemplate.delete(KEY_PREFIX + name);}
}
解决锁误删问题
上边版本的分布式锁实现,有可能出现锁误删的问题,具体情形如下:
- 线程1获取到锁,因为业务阻塞,导致阻塞时间长于锁自动释放时间。
- 线程2在锁自动释放后,获取到锁,执行业务,在执行过程中,线程1完成业务,释放锁,但此时Redis中的锁已经是由线程2创建的锁对象了,而被线程1删除了。
- 线程1删除了锁,因此线程3可以继续获取到锁,那么此时线程2和线程3已经是并行执行了,违反了锁的互斥性!!!。
那么解决办法就是在删除锁字段,即释放锁的时候,检查一下,当前的锁释放是之前自己获取到的锁!!。
主要的修改有两处:
- 获取锁的时候,存入线程唯一标识,由于集群情况下,不同集群的不同线程id可能一样,采用UUID来拼接线程id,构,保证标识唯一性。
- 在释放锁的时候,判断是否与当前线程标识一致,如果不一致,则不释放锁(避免误删别的线程的锁)。
private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";
@Overridepublic boolean tryLock(long timeoutSec) {// 获取线程标示String threadId = ID_PREFIX + Thread.currentThread().getId();// 获取锁Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);return Boolean.TRUE.equals(success);}
@Overridepublic void unlock() {// 获取线程标示String threadId = ID_PREFIX + Thread.currentThread().getId();// 获取锁中的标示String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);// 判断标示是否一致if(threadId.equals(id)) {// 释放锁stringRedisTemplate.delete(KEY_PREFIX + name);}}
基于Lua脚本实现多条指令原子性
判断锁标识是否一致和释放锁不是原子性的,在这个间隙,可能再次导致线程安全问题。
解决方法是借助lua脚本来保证指令执行的原子性。
Redis调用Lua脚本
- Redis使用
EVAL
可以用于执行脚本,Lua脚本中使用redis.call()
,可以用于执行Redis指令。 - 使用
EVAL
指令时,可以指定脚本需要操作的key类型的参数个数,后边跟上keys列表和argv列表,这样在脚本中就可以直接使用传入的参数。需要注意的是在Lua脚本中,数组索引下标从1开始,因此KEYS[1]就表示name, 而ARGV[1]就表示Rose
Java中使用Lua脚本
1 、在Resource目录下编写unlock.lua脚本:
2、配置Redis脚本调用对象DefaultRedisScript
,指定脚本路径和返回值类型。
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;static {UNLOCK_SCRIPT = new DefaultRedisScript<>();// 指定脚本路径UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));// 设置返回类型UNLOCK_SCRIPT.setResultType(Long.class);}
3、在unlock中使用stringRedisTemplate执行UNLOCK_SCTRIPT调用lua脚本保证操作的原子性。
@Override
public void unlock() {// 调用lua脚本stringRedisTemplate.execute(UNLOCK_SCRIPT,Collections.singletonList(KEY_PREFIX + name),ID_PREFIX + Thread.currentThread().getId());
}
Redisson
基于Lua脚本优化后的Redis分布式锁已经能够满足大部分场景下的业务需求,然而它还是具有一些不足:
- 1、锁不可重入
- 2、获取锁,不可重试
- 3、超时释放虽然避免了死锁,但是业务执行耗时较长,也会导致锁释放,存在安全隐患。
- 4、主从一致性,如果Redis提供了主从集群(读操作,使用从节点,写操作使用主节点),那么主从同步是存在延时的,当主服务器宕机,从节点尚未同步时,则会出现锁互斥失效。
为了实现上述这些高级功能,我们可以借助,Redisson,一个基于Redis的分布式锁框架。
官网地址
Redisson快速入门
- 引入依赖
- 配置Redisson客户端,在配置类中使用@Bean注解,将Redisson客户端类注入到IoC容器,交由Spring管理。
- 使用Redisson的分布式锁
Redisson可重入锁原理
可重入的原理与synchronized这类可重入锁原理类似,在Redis中使用setnx,存放一个hash类型的数据,field为锁的值,value为当前获取锁的次数。
- 首先判断锁是否存在,如果不存在,获取锁并添加线程标识,设置锁的有效期。
- 如果锁已经存在,根据锁标识判断锁是否属于该线程,如果属于将锁计数+1,否则获取锁失败。
- 业务执行完毕时,将锁计数减1,当锁计数减为0时释放锁,否则重置锁的有效期。
- 上述逻辑需要保证原子性,因此,所有的操作应该使用Lua脚本来实现。
Redisson的锁重试和Watchdog机制
- Redisson分布式锁实现了尝试重新获取锁的功能,在尝试获取锁的时候,可以传入最大等待时间
wait_time
和锁自动释放时间lease_time
。 - 尝试获取锁时,如果获取锁成功返回null,否则返回剩余的最大等待时间pttl,以毫秒为时间单位。如果剩余最大等待时间大于0,那么会订阅并等待释放锁的信号。
- 相应的,锁在释放时,会发布锁释放的消息,所有订阅该消息的线程都会接收。接收到后,需要判断此时等待是否超时,如果超时,则锁获取失败,否则重新尝试获取锁。
- 如果锁自动释放时间不为-1, 那么在获取锁成功时,Redisson内部采用了看门狗机制,开启watchDog机制,不停的更新锁的有效期(开启一个任务,在锁释放时间的1/3长后执行,执行的任务为本身,即递归调用,每1/3,重置有效期),这种看门狗机制,也是在锁释放时取消的。
Redisson的multilock
使用多个分布式Redis节点,每个Redis上构建一个锁,每次操作获取锁的时候,需要同时能够从多个Redis节点成功获取到锁,才视为成功获取到锁。
这种方式实际上构成了一个连锁,缺点在于运维成本高,实现复杂。
@BeforeEach是一种在软件开发中常见的测试框架中使用的注解。它通常用于JUnit或其他类似的单元测试框架中,用于标记在每个测试方法之前执行的设置操作。
使用:
秒杀优化
Redis缓存解耦
原始的秒杀业务需求,首先得判断秒杀库存,然后查询订单检验是否符合一人一单,从而锁定秒杀资格,随后再通过操作数据库修改库存,创建订单。
整个流程串联步骤较多,且频繁操作数据库,导致响应较慢。
其实业务可以拆解为两步:锁定秒杀劵和生成秒杀劵。锁定秒杀劵的请求对高并发的要求更严格,可以通过Redis缓存来实现,在锁定秒杀劵后,相当于用于订餐,给了用户一张小票,这张小票的信息会保存在阻塞队列中,开启一个异步线程来消费阻塞队列中的订单,生成相应的订单到数据库中。
具体实施时,可以采用lua脚本实现对Redis的操作,确保代码执行的原子性,异步线程对于阻塞队列的处理可以参照数据库的连接性能来构建。
Redis消息队列
基于阻塞队列来处理Redis生成的优惠券订单,有很大的问题:当高并发、高优惠券发放时,阻塞队列的长度却是有限的,而受限于JVM的内存,阻塞队列设置太大,很有可能导致OOM。
为此,应该使用消息队列在存放Redis生成的优惠券订单消息。
对于大型规模的消息处理场景,可以使用kafka、rabbitMq、rocketMq。
小规模场景,可以使用Redis自带的消息队列服务:
基于List结构
使用BRPOP、BLPOP来实现阻塞效果。
基于List消息队列的优缺点:
优点:
- 利用Redis存储,不受限于JVM内存上限
- 基于Redis持久化机制,数据安全性有保证
- 可以保证消息的有序性
缺点:
- 如果消息处理过程中,出现异常,则消息就丢失了
- 只支持单消费者模式。
基于PubSub的消息队列
相比于List结构的消息队列,基于PubSub的消息队列摩擦,消费者可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息。
基于Stream的消息队列
可以基于阻塞方式和&符号,读取最新的消息。
但是有漏读消息的风险,因为在读取到一条消息,并且消费消息的时候,这期间又来了多条消息,但是只能读取到最后发来的这条。
基于Stream的消息队列 - 消费者组
消费者组:将多个消费者划分到一个组中,监听同一个队列,具备以下特点:
- 消息分流:队列中的消息会分流给组内不同的消费者,而不是重复消费,从而加快消息处理的速度。
- 消息标识:消费者组会维护一个标识,记录最后一个被处理的消息,哪怕消费者宕机重启,还是会从标识之后开始读取消息,确保每一个消息都会被消费。
- 消息确认:消费者获取消息后,消息处于pending状态,并存入一个pending-list,当处理完毕后,需要通过XACK确认消息,标记消息为已处理,才会从pending-list移除。