目录
基于Stream的消息队列
Redis优化秒杀
登录头
改进秒杀业务,调高并发性能
Redis消息队列实现异步秒杀
编辑基于List结构模拟消息队列
基于PuSub的消息队列
编辑
基于Stream的消息队列
Redis消息队列
基于Stream的消息队列
Redis优化秒杀
登录头
改进秒杀业务,调高并发性能
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {@Resourceprivate ISeckillVoucherService seckillVoucherService;@Resourceprivate RedisIdWorker redisIdWorker;@Resourceprivate StringRedisTemplate stringRedisTemplate;@Resourceprivate RedissonClient redissonClient;private static final DefaultRedisScript<Long> SECKILL_SCRIPT;static {SECKILL_SCRIPT = new DefaultRedisScript<>();SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));SECKILL_SCRIPT.setResultType(Long.class);}
// 阻塞队列private BlockingQueue<VoucherOrder> orederTasks = new ArrayBlockingQueue(1024*1024);
//创建线程池private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();@PostConstructprivate void init(){//TODO 类初始化的时候执行线程池SECKILL_ORDER_EXECUTOR.submit(new VocherOrderHandler());}private class VocherOrderHandler implements Runnable{@Overridepublic void run() {while (true) {try {//1.获取队列中的订单信息VoucherOrder voucherOrder = orederTasks.take();
// 2.创建订单handleVocherOrder(voucherOrder);}catch (Exception e) {
// e.printStackTrace();log.error("处理订单异常" ,e);}}}}private void handleVocherOrder(VoucherOrder voucherOrder) {Long userId = voucherOrder.getUserId();
// Long userId = UserHolder.getUser().getId();
// synchronized锁还是有线程问题
// synchronized(userId.toString().intern()) {//intern返回字符串的规范表示//TODO 分布锁//TODO 创建锁对象
// SimpleRedisLock lock = new SimpleRedisLock("order" + userId, stringRedisTemplate);
// TODO 使用工具RedissonRLock lock = redissonClient.getLock("lock:order" + userId);//获取锁//TODOboolean isLock = lock.tryLock();if ( !isLock ){//获取锁失败,返回错误或重试log.error("不允许重复下单");return ;}try {//获取代理对象(事务)事务生效//TODO 获取锁之后再创建事务
// IVoucherOrderService proxy = (IVoucherOrderService)AopContext.currentProxy();//TODO 事务提交完再释放锁proxy.createVoucherOrderTwo(voucherOrder);//事务能够//TODO 可以避免事务没提交就释放锁的安全问题} finally {//释放锁lock.unlock();}}private IVoucherOrderService proxy;// TODO@Overridepublic Result seckillVoucher(Long voucherId) {Long userId = UserHolder.getUser().getId();//1.执行lua脚本Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),voucherId.toString(), userId.toString());//2.判断结果为0int r = result.intValue();if ( r!=0 ) {//2.1不为0,代表没有购买资格return Result.fail(r==1?"库存不足":"不能重复下单");}//TODO 2.2为0, 有购买资格,把下单信息保存到阻塞队列long orderId = redisIdWorker.nextId("order");//2.3.创建订单VoucherOrder voucherOrder = new VoucherOrder();//2.3订单divoucherOrder.setId(orderId);//2.4用户id
// Long userId = UserHolder.getUser().getId();voucherOrder.setUserId(userId);//2.5代金卷idvoucherOrder.setVoucherId(voucherId);//TODO 2.6 订单放入堵塞队列orederTasks.add(voucherOrder);//TODO 3.获取代理对象 创建全局变量其他地方才可以用proxy = (IVoucherOrderService)AopContext.currentProxy();//7.返回订单//TODO 保存阻塞队列//3.返回订单idreturn Result.ok(orderId);}/* @Overridepublic Result seckillVoucher(Long voucherId) {//1.查询优惠卷SeckillVoucher voucher = seckillVoucherService.getById(voucherId);//2.判断秒杀是否开始if ( voucher.getBeginTime().isAfter(LocalDateTime.now()) ) {
// 秒杀尚未开始return Result.fail ("秒杀尚未开始!");}//3.判断秒杀是否已经结束if ( voucher.getEndTime().isBefore(LocalDateTime.now()) ) {return Result.fail ("秒杀已结束!");}//4.判断库存是否充足if ( voucher.getStock()<1 ) {//库存不足return Result.fail("库存不足");}
// -------Long userId = UserHolder.getUser().getId();
// synchronized锁还是有线程问题
// synchronized(userId.toString().intern()) {//intern返回字符串的规范表示//TODO 分布锁//TODO 创建锁对象
// SimpleRedisLock lock = new SimpleRedisLock("order" + userId, stringRedisTemplate);
// TODO 使用工具RedissonRLock lock = redissonClient.getLock("lock:order" + userId);//获取锁//TODOboolean isLock = lock.tryLock();if ( !isLock ){//获取锁失败,返回错误或重试return Result.fail("不允许重复下单");}try {//获取代理对象(事务)事务生效//TODO 获取锁之后再创建事务IVoucherOrderService proxy = (IVoucherOrderService)AopContext.currentProxy();//TODO 事务提交完再释放锁return proxy.createVoucherOrder(voucherId);//事务能够//TODO 可以避免事务没提交就释放锁的安全问题} finally {//释放锁lock.unlock();}
//
// }}
*/@Transactionalpublic void createVoucherOrderTwo(VoucherOrder voucherOrder) {//TODO 6.一人一单
// TODO 不是在主线程了,在线程池了Long userId = voucherOrder.getUserId();
// userId.toString()底层代码是创建对象,所以有可能还是不一样的
// synchronized(userId.toString().intern()) {//intern返回字符串的规范表示//TODO 6.1查询订单Integer count = query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count();//TODO 6.2判断是否存在if ( count > 0 ) {//用户已经买过了log.error("用户已经买过了一次了");return ;}//5.扣减库存boolean sucess = seckillVoucherService.update().setSql("stock = stock-1")//set stock = stock-1.gt("stock", "0")//可以解决超卖 where id ?and stock >0.eq("voucher_id", voucherOrder.getVoucherId()).update();//
// .eq("stock",voucher.getStock()).update();//where id ?and stock = ?if ( !sucess ) {//扣减不足log.error("库存不足");return ;}//7.0创建订单save(voucherOrder);}@Transactional//加入事务public Result createVoucherOrder(Long voucherId) {//TODO 6.一人一单Long userId = UserHolder.getUser().getId();
// userId.toString()底层代码是创建对象,所以有可能还是不一样的
// synchronized(userId.toString().intern()) {//intern返回字符串的规范表示//TODO 6.1查询订单Integer count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();//TODO 6.2判断是否存在if ( count > 0 ) {//用户已经买过了return Result.fail("用户已经买过了一次了");}//5.扣减库存boolean sucess = seckillVoucherService.update().setSql("stock = stock-1")//set stock = stock-1.gt("stock", "0")//可以解决超卖 where id ?and stock >0.eq("voucher_id", voucherId).update();//
// .eq("stock",voucher.getStock()).update();//where id ?and stock = ?if ( !sucess ) {//扣减不足return Result.fail("库存不足");}//6.创建订单VoucherOrder voucherOrder = new VoucherOrder();//6.1订单dilong orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);//6.2用户id
// Long userId = UserHolder.getUser().getId();voucherOrder.setUserId(userId);//6.3代金卷idvoucherOrder.setVoucherId(voucherId);save(voucherOrder);//7.返回订单return Result.ok(orderId);}}
Redis消息队列实现异步秒杀
基于List结构模拟消息队列
基于PuSub的消息队列
基于Stream的消息队列
Redis消息队列
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {@Resourceprivate ISeckillVoucherService seckillVoucherService;@Resourceprivate RedisIdWorker redisIdWorker;@Resourceprivate StringRedisTemplate stringRedisTemplate;@Resourceprivate RedissonClient redissonClient;private static final DefaultRedisScript<Long> SECKILL_SCRIPT;static {SECKILL_SCRIPT = new DefaultRedisScript<>();SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));SECKILL_SCRIPT.setResultType(Long.class);}
// TODO
//创建线程池private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();@PostConstructprivate void init(){//TODO 类初始化的时候执行线程池SECKILL_ORDER_EXECUTOR.submit(new VocherOrderHandler());}private class VocherOrderHandler implements Runnable{String querName = "stream.orders";@Overridepublic void run() {while (true) {try {//1.获取消息队列中的订单信息
// XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS stream.order >List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create(querName, ReadOffset.lastConsumed()));//判断消息获取是否成功if ( list == null || list.isEmpty() ) {//2.1如果获取失败,说明没有消息,继续下一次循环continue;}//3.解析消息中的订单信息MapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> value = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);//4.如果获取成功,可以下单handleVocherOrder(voucherOrder);//ACK确认 SACK stream.orders g1 idstringRedisTemplate.opsForStream().acknowledge(querName,"g1",record.getId());}catch (Exception e) {
// e.printStackTrace();log.error("处理订单异常" ,e);handlePendingList();}}}private void handlePendingList() {while (true) {try {//1.获取pending-list中的订单信息
// XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS stream.order 0List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1),StreamOffset.create(querName, ReadOffset.from("0")));//判断消息获取是否成功if ( list == null || list.isEmpty() ) {//2.1如果获取失败,说明pending-list没有异常消息,结束循环continue;}//3.解析消息中的订单信息MapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> value = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);//4.如果获取成功,可以下单handleVocherOrder(voucherOrder);//ACK确认 SACK stream.orders g1 idstringRedisTemplate.opsForStream().acknowledge(querName,"g1",record.getId());}catch (Exception e) {
// e.printStackTrace();log.error("处理pending-list订单异常" ,e);try {Thread.sleep(20);}catch (InterruptedException interruptedException) {interruptedException.printStackTrace();}}}}}/*// 阻塞队列private BlockingQueue<VoucherOrder> orederTasks = new ArrayBlockingQueue(1024*1024);private class VocherOrderHandler implements Runnable{@Overridepublic void run() {while (true) {try {//1.获取队列中的订单信息VoucherOrder voucherOrder = orederTasks.take();
// 2.创建订单handleVocherOrder(voucherOrder);}catch (Exception e) {
// e.printStackTrace();log.error("处理订单异常" ,e);}}}}*/private void handleVocherOrder(VoucherOrder voucherOrder) {Long userId = voucherOrder.getUserId();
// Long userId = UserHolder.getUser().getId();
// synchronized锁还是有线程问题
// synchronized(userId.toString().intern()) {//intern返回字符串的规范表示//TODO 分布锁//TODO 创建锁对象
// SimpleRedisLock lock = new SimpleRedisLock("order" + userId, stringRedisTemplate);
// TODO 使用工具RedissonRLock lock = redissonClient.getLock("lock:order" + userId);//获取锁//TODOboolean isLock = lock.tryLock();if ( !isLock ){//获取锁失败,返回错误或重试log.error("不允许重复下单");return ;}try {//获取代理对象(事务)事务生效//TODO 获取锁之后再创建事务
// IVoucherOrderService proxy = (IVoucherOrderService)AopContext.currentProxy();//TODO 事务提交完再释放锁proxy.createVoucherOrderTwo(voucherOrder);//事务能够//TODO 可以避免事务没提交就释放锁的安全问题} finally {//释放锁lock.unlock();}}private IVoucherOrderService proxy;//TODO@Overridepublic Result seckillVoucher(Long voucherId) {
// 获取用户Long userId = UserHolder.getUser().getId();
// 获取订单idlong orderId = redisIdWorker.nextId("order");//1.执行lua脚本Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),voucherId.toString(), userId.toString(),String.valueOf(orderId));//2.判断结果为0int r = result.intValue();if ( r!=0 ) {//2.1不为0,代表没有购买资格return Result.fail(r==1?"库存不足":"不能重复下单");}//TODO 2.2为0, 有购买资格,把下单信息保存到阻塞队列//TODO 3.获取代理对象 创建全局变量其他地方才可以用proxy = (IVoucherOrderService)AopContext.currentProxy();//7.返回订单//TODO 保存阻塞队列//3.返回订单idreturn Result.ok(orderId);}// TODO/* @Overridepublic Result seckillVoucher(Long voucherId) {Long userId = UserHolder.getUser().getId();//1.执行lua脚本Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),voucherId.toString(), userId.toString());//2.判断结果为0int r = result.intValue();if ( r!=0 ) {//2.1不为0,代表没有购买资格return Result.fail(r==1?"库存不足":"不能重复下单");}//TODO 2.2为0, 有购买资格,把下单信息保存到阻塞队列long orderId = redisIdWorker.nextId("order");//2.3.创建订单VoucherOrder voucherOrder = new VoucherOrder();//2.3订单divoucherOrder.setId(orderId);//2.4用户id
// Long userId = UserHolder.getUser().getId();voucherOrder.setUserId(userId);//2.5代金卷idvoucherOrder.setVoucherId(voucherId);//TODO 2.6 订单放入堵塞队列orederTasks.add(voucherOrder);//TODO 3.获取代理对象 创建全局变量其他地方才可以用proxy = (IVoucherOrderService)AopContext.currentProxy();//7.返回订单//TODO 保存阻塞队列//3.返回订单idreturn Result.ok(orderId);}*//* @Overridepublic Result seckillVoucher(Long voucherId) {//1.查询优惠卷SeckillVoucher voucher = seckillVoucherService.getById(voucherId);//2.判断秒杀是否开始if ( voucher.getBeginTime().isAfter(LocalDateTime.now()) ) {
// 秒杀尚未开始return Result.fail ("秒杀尚未开始!");}//3.判断秒杀是否已经结束if ( voucher.getEndTime().isBefore(LocalDateTime.now()) ) {return Result.fail ("秒杀已结束!");}//4.判断库存是否充足if ( voucher.getStock()<1 ) {//库存不足return Result.fail("库存不足");}
// -------Long userId = UserHolder.getUser().getId();
// synchronized锁还是有线程问题
// synchronized(userId.toString().intern()) {//intern返回字符串的规范表示//TODO 分布锁//TODO 创建锁对象
// SimpleRedisLock lock = new SimpleRedisLock("order" + userId, stringRedisTemplate);
// TODO 使用工具RedissonRLock lock = redissonClient.getLock("lock:order" + userId);//获取锁//TODOboolean isLock = lock.tryLock();if ( !isLock ){//获取锁失败,返回错误或重试return Result.fail("不允许重复下单");}try {//获取代理对象(事务)事务生效//TODO 获取锁之后再创建事务IVoucherOrderService proxy = (IVoucherOrderService)AopContext.currentProxy();//TODO 事务提交完再释放锁return proxy.createVoucherOrder(voucherId);//事务能够//TODO 可以避免事务没提交就释放锁的安全问题} finally {//释放锁lock.unlock();}
//
// }}
*/@Transactionalpublic void createVoucherOrderTwo(VoucherOrder voucherOrder) {//TODO 6.一人一单
// TODO 不是在主线程了,在线程池了Long userId = voucherOrder.getUserId();
// userId.toString()底层代码是创建对象,所以有可能还是不一样的
// synchronized(userId.toString().intern()) {//intern返回字符串的规范表示//TODO 6.1查询订单Integer count = query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count();//TODO 6.2判断是否存在if ( count > 0 ) {//用户已经买过了log.error("用户已经买过了一次了");return ;}//5.扣减库存boolean sucess = seckillVoucherService.update().setSql("stock = stock-1")//set stock = stock-1.gt("stock", "0")//可以解决超卖 where id ?and stock >0.eq("voucher_id", voucherOrder.getVoucherId()).update();//
// .eq("stock",voucher.getStock()).update();//where id ?and stock = ?if ( !sucess ) {//扣减不足log.error("库存不足");return ;}//7.0创建订单save(voucherOrder);}@Transactional//加入事务public Result createVoucherOrder(Long voucherId) {//TODO 6.一人一单Long userId = UserHolder.getUser().getId();
// userId.toString()底层代码是创建对象,所以有可能还是不一样的
// synchronized(userId.toString().intern()) {//intern返回字符串的规范表示//TODO 6.1查询订单Integer count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();//TODO 6.2判断是否存在if ( count > 0 ) {//用户已经买过了return Result.fail("用户已经买过了一次了");}//5.扣减库存boolean sucess = seckillVoucherService.update().setSql("stock = stock-1")//set stock = stock-1.gt("stock", "0")//可以解决超卖 where id ?and stock >0.eq("voucher_id", voucherId).update();//
// .eq("stock",voucher.getStock()).update();//where id ?and stock = ?if ( !sucess ) {//扣减不足return Result.fail("库存不足");}//6.创建订单VoucherOrder voucherOrder = new VoucherOrder();//6.1订单dilong orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);//6.2用户id
// Long userId = UserHolder.getUser().getId();voucherOrder.setUserId(userId);//6.3代金卷idvoucherOrder.setVoucherId(voucherId);save(voucherOrder);//7.返回订单return Result.ok(orderId);}}