Redis 优惠卷秒杀(二) 异步秒杀、基于Stream的消息队列处理

目录

基于Stream的消息队列

Redis优化秒杀

登录头 

改进秒杀业务,调高并发性能

Redis消息队列实现异步秒杀

​编辑基于List结构模拟消息队列 

基于PuSub的消息队列

 ​编辑

  基于Stream的消息队列

Redis消息队列


基于Stream的消息队列

Redis优化秒杀

登录头 

 

改进秒杀业务,调高并发性能


@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {@Resourceprivate ISeckillVoucherService seckillVoucherService;@Resourceprivate RedisIdWorker redisIdWorker;@Resourceprivate StringRedisTemplate stringRedisTemplate;@Resourceprivate RedissonClient redissonClient;private static final DefaultRedisScript<Long> SECKILL_SCRIPT;static {SECKILL_SCRIPT = new DefaultRedisScript<>();SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));SECKILL_SCRIPT.setResultType(Long.class);}
//    阻塞队列private BlockingQueue<VoucherOrder> orederTasks = new ArrayBlockingQueue(1024*1024);
//创建线程池private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();@PostConstructprivate void init(){//TODO 类初始化的时候执行线程池SECKILL_ORDER_EXECUTOR.submit(new VocherOrderHandler());}private class VocherOrderHandler implements Runnable{@Overridepublic void run() {while (true) {try {//1.获取队列中的订单信息VoucherOrder voucherOrder = orederTasks.take();
//                    2.创建订单handleVocherOrder(voucherOrder);}catch (Exception e) {
//                    e.printStackTrace();log.error("处理订单异常" ,e);}}}}private void handleVocherOrder(VoucherOrder voucherOrder) {Long userId = voucherOrder.getUserId();
//        Long userId = UserHolder.getUser().getId();
//        synchronized锁还是有线程问题
//        synchronized(userId.toString().intern()) {//intern返回字符串的规范表示//TODO 分布锁//TODO 创建锁对象
//        SimpleRedisLock lock = new SimpleRedisLock("order" + userId, stringRedisTemplate);
//        TODO 使用工具RedissonRLock lock = redissonClient.getLock("lock:order" + userId);//获取锁//TODOboolean isLock = lock.tryLock();if ( !isLock ){//获取锁失败,返回错误或重试log.error("不允许重复下单");return ;}try {//获取代理对象(事务)事务生效//TODO 获取锁之后再创建事务
//            IVoucherOrderService proxy = (IVoucherOrderService)AopContext.currentProxy();//TODO 事务提交完再释放锁proxy.createVoucherOrderTwo(voucherOrder);//事务能够//TODO 可以避免事务没提交就释放锁的安全问题} finally {//释放锁lock.unlock();}}private IVoucherOrderService proxy;//    TODO@Overridepublic Result seckillVoucher(Long voucherId) {Long userId = UserHolder.getUser().getId();//1.执行lua脚本Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),voucherId.toString(), userId.toString());//2.判断结果为0int r = result.intValue();if ( r!=0 ) {//2.1不为0,代表没有购买资格return Result.fail(r==1?"库存不足":"不能重复下单");}//TODO 2.2为0, 有购买资格,把下单信息保存到阻塞队列long orderId = redisIdWorker.nextId("order");//2.3.创建订单VoucherOrder voucherOrder = new VoucherOrder();//2.3订单divoucherOrder.setId(orderId);//2.4用户id
//        Long userId = UserHolder.getUser().getId();voucherOrder.setUserId(userId);//2.5代金卷idvoucherOrder.setVoucherId(voucherId);//TODO 2.6 订单放入堵塞队列orederTasks.add(voucherOrder);//TODO 3.获取代理对象 创建全局变量其他地方才可以用proxy = (IVoucherOrderService)AopContext.currentProxy();//7.返回订单//TODO 保存阻塞队列//3.返回订单idreturn Result.ok(orderId);}/*  @Overridepublic Result seckillVoucher(Long voucherId) {//1.查询优惠卷SeckillVoucher voucher = seckillVoucherService.getById(voucherId);//2.判断秒杀是否开始if ( voucher.getBeginTime().isAfter(LocalDateTime.now()) ) {
//            秒杀尚未开始return Result.fail ("秒杀尚未开始!");}//3.判断秒杀是否已经结束if ( voucher.getEndTime().isBefore(LocalDateTime.now()) ) {return Result.fail ("秒杀已结束!");}//4.判断库存是否充足if ( voucher.getStock()<1 ) {//库存不足return Result.fail("库存不足");}
//        -------Long userId = UserHolder.getUser().getId();
//        synchronized锁还是有线程问题
//        synchronized(userId.toString().intern()) {//intern返回字符串的规范表示//TODO 分布锁//TODO 创建锁对象
//        SimpleRedisLock lock = new SimpleRedisLock("order" + userId, stringRedisTemplate);
//        TODO 使用工具RedissonRLock lock = redissonClient.getLock("lock:order" + userId);//获取锁//TODOboolean isLock = lock.tryLock();if ( !isLock ){//获取锁失败,返回错误或重试return Result.fail("不允许重复下单");}try {//获取代理对象(事务)事务生效//TODO 获取锁之后再创建事务IVoucherOrderService proxy = (IVoucherOrderService)AopContext.currentProxy();//TODO 事务提交完再释放锁return proxy.createVoucherOrder(voucherId);//事务能够//TODO 可以避免事务没提交就释放锁的安全问题} finally {//释放锁lock.unlock();}
//
//        }}
*/@Transactionalpublic void createVoucherOrderTwo(VoucherOrder voucherOrder) {//TODO 6.一人一单
//      TODO 不是在主线程了,在线程池了Long userId = voucherOrder.getUserId();
//        userId.toString()底层代码是创建对象,所以有可能还是不一样的
//        synchronized(userId.toString().intern()) {//intern返回字符串的规范表示//TODO 6.1查询订单Integer count = query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count();//TODO 6.2判断是否存在if ( count > 0 ) {//用户已经买过了log.error("用户已经买过了一次了");return ;}//5.扣减库存boolean sucess = seckillVoucherService.update().setSql("stock = stock-1")//set stock = stock-1.gt("stock", "0")//可以解决超卖 where id ?and stock >0.eq("voucher_id", voucherOrder.getVoucherId()).update();//
//                .eq("stock",voucher.getStock()).update();//where id ?and stock = ?if ( !sucess ) {//扣减不足log.error("库存不足");return ;}//7.0创建订单save(voucherOrder);}@Transactional//加入事务public   Result createVoucherOrder(Long voucherId) {//TODO 6.一人一单Long userId = UserHolder.getUser().getId();
//        userId.toString()底层代码是创建对象,所以有可能还是不一样的
//        synchronized(userId.toString().intern()) {//intern返回字符串的规范表示//TODO 6.1查询订单Integer count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();//TODO 6.2判断是否存在if ( count > 0 ) {//用户已经买过了return Result.fail("用户已经买过了一次了");}//5.扣减库存boolean sucess = seckillVoucherService.update().setSql("stock = stock-1")//set stock = stock-1.gt("stock", "0")//可以解决超卖 where id ?and stock >0.eq("voucher_id", voucherId).update();//
//                .eq("stock",voucher.getStock()).update();//where id ?and stock = ?if ( !sucess ) {//扣减不足return Result.fail("库存不足");}//6.创建订单VoucherOrder voucherOrder = new VoucherOrder();//6.1订单dilong orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);//6.2用户id
//        Long userId = UserHolder.getUser().getId();voucherOrder.setUserId(userId);//6.3代金卷idvoucherOrder.setVoucherId(voucherId);save(voucherOrder);//7.返回订单return Result.ok(orderId);}}

Redis消息队列实现异步秒杀

基于List结构模拟消息队列 

 

基于PuSub的消息队列

 

 

  基于Stream的消息队列

 

 

Redis消息队列


@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {@Resourceprivate ISeckillVoucherService seckillVoucherService;@Resourceprivate RedisIdWorker redisIdWorker;@Resourceprivate StringRedisTemplate stringRedisTemplate;@Resourceprivate RedissonClient redissonClient;private static final DefaultRedisScript<Long> SECKILL_SCRIPT;static {SECKILL_SCRIPT = new DefaultRedisScript<>();SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));SECKILL_SCRIPT.setResultType(Long.class);}
//    TODO
//创建线程池private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();@PostConstructprivate void init(){//TODO 类初始化的时候执行线程池SECKILL_ORDER_EXECUTOR.submit(new VocherOrderHandler());}private class VocherOrderHandler implements Runnable{String querName = "stream.orders";@Overridepublic void run() {while (true) {try {//1.获取消息队列中的订单信息
//                    XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS stream.order >List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create(querName, ReadOffset.lastConsumed()));//判断消息获取是否成功if ( list == null || list.isEmpty() ) {//2.1如果获取失败,说明没有消息,继续下一次循环continue;}//3.解析消息中的订单信息MapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> value = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);//4.如果获取成功,可以下单handleVocherOrder(voucherOrder);//ACK确认 SACK stream.orders g1 idstringRedisTemplate.opsForStream().acknowledge(querName,"g1",record.getId());}catch (Exception e) {
//                    e.printStackTrace();log.error("处理订单异常" ,e);handlePendingList();}}}private void handlePendingList() {while (true) {try {//1.获取pending-list中的订单信息
//                    XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS stream.order 0List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1),StreamOffset.create(querName, ReadOffset.from("0")));//判断消息获取是否成功if ( list == null || list.isEmpty() ) {//2.1如果获取失败,说明pending-list没有异常消息,结束循环continue;}//3.解析消息中的订单信息MapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> value = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);//4.如果获取成功,可以下单handleVocherOrder(voucherOrder);//ACK确认 SACK stream.orders g1 idstringRedisTemplate.opsForStream().acknowledge(querName,"g1",record.getId());}catch (Exception e) {
//                    e.printStackTrace();log.error("处理pending-list订单异常" ,e);try {Thread.sleep(20);}catch (InterruptedException interruptedException) {interruptedException.printStackTrace();}}}}}/*//    阻塞队列private BlockingQueue<VoucherOrder> orederTasks = new ArrayBlockingQueue(1024*1024);private class VocherOrderHandler implements Runnable{@Overridepublic void run() {while (true) {try {//1.获取队列中的订单信息VoucherOrder voucherOrder = orederTasks.take();
//                    2.创建订单handleVocherOrder(voucherOrder);}catch (Exception e) {
//                    e.printStackTrace();log.error("处理订单异常" ,e);}}}}*/private void handleVocherOrder(VoucherOrder voucherOrder) {Long userId = voucherOrder.getUserId();
//        Long userId = UserHolder.getUser().getId();
//        synchronized锁还是有线程问题
//        synchronized(userId.toString().intern()) {//intern返回字符串的规范表示//TODO 分布锁//TODO 创建锁对象
//        SimpleRedisLock lock = new SimpleRedisLock("order" + userId, stringRedisTemplate);
//        TODO 使用工具RedissonRLock lock = redissonClient.getLock("lock:order" + userId);//获取锁//TODOboolean isLock = lock.tryLock();if ( !isLock ){//获取锁失败,返回错误或重试log.error("不允许重复下单");return ;}try {//获取代理对象(事务)事务生效//TODO 获取锁之后再创建事务
//            IVoucherOrderService proxy = (IVoucherOrderService)AopContext.currentProxy();//TODO 事务提交完再释放锁proxy.createVoucherOrderTwo(voucherOrder);//事务能够//TODO 可以避免事务没提交就释放锁的安全问题} finally {//释放锁lock.unlock();}}private IVoucherOrderService proxy;//TODO@Overridepublic Result seckillVoucher(Long voucherId) {
//        获取用户Long userId = UserHolder.getUser().getId();
//        获取订单idlong orderId = redisIdWorker.nextId("order");//1.执行lua脚本Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),voucherId.toString(), userId.toString(),String.valueOf(orderId));//2.判断结果为0int r = result.intValue();if ( r!=0 ) {//2.1不为0,代表没有购买资格return Result.fail(r==1?"库存不足":"不能重复下单");}//TODO 2.2为0, 有购买资格,把下单信息保存到阻塞队列//TODO 3.获取代理对象 创建全局变量其他地方才可以用proxy = (IVoucherOrderService)AopContext.currentProxy();//7.返回订单//TODO 保存阻塞队列//3.返回订单idreturn Result.ok(orderId);}//    TODO/* @Overridepublic Result seckillVoucher(Long voucherId) {Long userId = UserHolder.getUser().getId();//1.执行lua脚本Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),voucherId.toString(), userId.toString());//2.判断结果为0int r = result.intValue();if ( r!=0 ) {//2.1不为0,代表没有购买资格return Result.fail(r==1?"库存不足":"不能重复下单");}//TODO 2.2为0, 有购买资格,把下单信息保存到阻塞队列long orderId = redisIdWorker.nextId("order");//2.3.创建订单VoucherOrder voucherOrder = new VoucherOrder();//2.3订单divoucherOrder.setId(orderId);//2.4用户id
//        Long userId = UserHolder.getUser().getId();voucherOrder.setUserId(userId);//2.5代金卷idvoucherOrder.setVoucherId(voucherId);//TODO 2.6 订单放入堵塞队列orederTasks.add(voucherOrder);//TODO 3.获取代理对象 创建全局变量其他地方才可以用proxy = (IVoucherOrderService)AopContext.currentProxy();//7.返回订单//TODO 保存阻塞队列//3.返回订单idreturn Result.ok(orderId);}*//*  @Overridepublic Result seckillVoucher(Long voucherId) {//1.查询优惠卷SeckillVoucher voucher = seckillVoucherService.getById(voucherId);//2.判断秒杀是否开始if ( voucher.getBeginTime().isAfter(LocalDateTime.now()) ) {
//            秒杀尚未开始return Result.fail ("秒杀尚未开始!");}//3.判断秒杀是否已经结束if ( voucher.getEndTime().isBefore(LocalDateTime.now()) ) {return Result.fail ("秒杀已结束!");}//4.判断库存是否充足if ( voucher.getStock()<1 ) {//库存不足return Result.fail("库存不足");}
//        -------Long userId = UserHolder.getUser().getId();
//        synchronized锁还是有线程问题
//        synchronized(userId.toString().intern()) {//intern返回字符串的规范表示//TODO 分布锁//TODO 创建锁对象
//        SimpleRedisLock lock = new SimpleRedisLock("order" + userId, stringRedisTemplate);
//        TODO 使用工具RedissonRLock lock = redissonClient.getLock("lock:order" + userId);//获取锁//TODOboolean isLock = lock.tryLock();if ( !isLock ){//获取锁失败,返回错误或重试return Result.fail("不允许重复下单");}try {//获取代理对象(事务)事务生效//TODO 获取锁之后再创建事务IVoucherOrderService proxy = (IVoucherOrderService)AopContext.currentProxy();//TODO 事务提交完再释放锁return proxy.createVoucherOrder(voucherId);//事务能够//TODO 可以避免事务没提交就释放锁的安全问题} finally {//释放锁lock.unlock();}
//
//        }}
*/@Transactionalpublic void createVoucherOrderTwo(VoucherOrder voucherOrder) {//TODO 6.一人一单
//      TODO 不是在主线程了,在线程池了Long userId = voucherOrder.getUserId();
//        userId.toString()底层代码是创建对象,所以有可能还是不一样的
//        synchronized(userId.toString().intern()) {//intern返回字符串的规范表示//TODO 6.1查询订单Integer count = query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count();//TODO 6.2判断是否存在if ( count > 0 ) {//用户已经买过了log.error("用户已经买过了一次了");return ;}//5.扣减库存boolean sucess = seckillVoucherService.update().setSql("stock = stock-1")//set stock = stock-1.gt("stock", "0")//可以解决超卖 where id ?and stock >0.eq("voucher_id", voucherOrder.getVoucherId()).update();//
//                .eq("stock",voucher.getStock()).update();//where id ?and stock = ?if ( !sucess ) {//扣减不足log.error("库存不足");return ;}//7.0创建订单save(voucherOrder);}@Transactional//加入事务public   Result createVoucherOrder(Long voucherId) {//TODO 6.一人一单Long userId = UserHolder.getUser().getId();
//        userId.toString()底层代码是创建对象,所以有可能还是不一样的
//        synchronized(userId.toString().intern()) {//intern返回字符串的规范表示//TODO 6.1查询订单Integer count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();//TODO 6.2判断是否存在if ( count > 0 ) {//用户已经买过了return Result.fail("用户已经买过了一次了");}//5.扣减库存boolean sucess = seckillVoucherService.update().setSql("stock = stock-1")//set stock = stock-1.gt("stock", "0")//可以解决超卖 where id ?and stock >0.eq("voucher_id", voucherId).update();//
//                .eq("stock",voucher.getStock()).update();//where id ?and stock = ?if ( !sucess ) {//扣减不足return Result.fail("库存不足");}//6.创建订单VoucherOrder voucherOrder = new VoucherOrder();//6.1订单dilong orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);//6.2用户id
//        Long userId = UserHolder.getUser().getId();voucherOrder.setUserId(userId);//6.3代金卷idvoucherOrder.setVoucherId(voucherId);save(voucherOrder);//7.返回订单return Result.ok(orderId);}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/18533.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

《剑来》语句摘录(九)

​ ​ ◆ 第一百二十六章 就山 ​ >> 忙中不出错&#xff0c;闲来无是非&#xff0c;都需要真本事的。 >> 路过一座不关门的宅子&#xff0c;院内有个老人&#xff0c;躺在藤椅上&#xff0c;正在闭眼养神&#xff0c;呼吸绵长&#xff0c;似已浅睡&#xff0c;手持…

MyBatis实现动态SQL更新

博主记得在一个周五快下班的下午&#xff0c;产品找到我&#xff08;为什么总感觉周五快下班就来活 &#x1f602;&#xff09;&#xff0c;跟我说有几个业务列表查询需要加上时间条件过滤数据&#xff0c;这个条件可能会变&#xff0c;不保证以后不修改&#xff0c;这个改动涉…

Grafana 图形面板定制方案

Grafana 在一个 Panel 中添加多数据源同时展示以及修改通过 transform 修改图表图例的方式。 多个数据在一个折线图中 在 Grafana 中我们可能会希望多个数据在一个Panel 中展示&#xff0c;比如&#xff1a; 通过编辑 Panel 增加 Query 数据我们即可做到&#xff1a;像上面中…

windows开机启动nginx(服务方式启动)

提示&#xff1a;本文章介绍如何借助Windows Service Wrapper小工具&#xff0c;将Nginx转换为Windows服务&#xff0c;在服务中心配置自启动&#xff0c;从而在开机时windows自行启动Nginx服务 Nginx是什么 官方链接&#xff1a;nginx下载 Nginx 是一个高性能的HTTP和反向代理…

大学生活动社交小程序开发笔记(1)

可研分析 大学生活动社交小程序是一种基于移动互联网的社交平台&#xff0c;旨在为大学生提供一个方便、快捷、安全的社交和活动交流平台 功能规划 活动发布&#xff1a;平台可以发布将要举行的活动&#xff0c;包括时间、地点、费用等信息&#xff0c;并邀请其他用户参加。…

Graphics Mill 11.1.18 -24-06-2023 Crack

Graphics Mill 是适用于 .NET 和 ASP.NET 开发人员的最强大的成像工具集。它允许用户轻松向 .NET 应用程序添加复杂的光栅和矢量图像处理功能。 加载和保存 JPEG、PNG 和另外 8 种图像格式 调整大小、裁剪、自动修复、色度键和 30 多种其他图像操作 可处理任何尺寸&#xff08…

Kkfileview | Docker | +Redis文件预览kkfile配置

文章目录 简介DockerRedis部署 简介 kkFileView为文件文档在线预览解决方案&#xff0c;该项目使用流行的spring boot搭建&#xff0c;易上手和部署&#xff0c;基本支持主流办公文档的在线预览&#xff0c;如doc,docx,xls,xlsx,ppt,pptx,pdf,txt,zip,rar,图片,视频,音频等等 …

WebSocket使用记录

使用视频地址 1、添加前端使用文件 2、后端配置 2.1添加依赖 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency>2.2添加websocket配置类 import org.spri…

Android mac 交叉编译与ffmpeg编译踩坑记 (v7a 与 v8a and 动态库与静态库)

Android mac 交叉编译与ffmpeg编译踩坑记 环境: system: mac NDK: android-ndk-r17c Fffmpeg: ffmpeg-4.0.2 Cmake: 3.10.2 Gradle: 4.1.3 tips: 本文记录踩坑过程,具体细节如果感兴趣可以在评论区留言交流讨论! mac 编译 (动态库(so)) 首先来回顾一下,mac原始库是如何…

沉浸式翻译(immersive-translate):解决谷歌翻译无法使用的问题

前言&#xff1a; 在如今的全球化时代&#xff0c;语言不再是隔离人们交流的障碍。然而&#xff0c;在浏览外语网页时&#xff0c;我们常常遇到一个问题&#xff1a;无法准确理解其中的内容。 谷歌翻译是一款强大的翻译工具&#xff0c;但由于各种原因&#xff0c;我们可能无法…

负载均衡的知识点

目录 1.负载均衡是什么 2.负载均衡的分类 客户端负载均衡&#xff1a; 服务端负载均衡&#xff1a; 软件实现&#xff1a;根据OSI模型可以分为四层负载均衡和七层负载均衡 硬件实现&#xff1a; 附1&#xff1a;客户端和服务端&#xff1a; 附2&#xff1a;OSI…

nginx基本使用

这是一份完整的nginx配置文件&#xff1a; #user nobody; worker_processes 1;#error_log logs/error.log; #error_log logs/error.log notice; #error_log logs/error.log info;#pid logs/nginx.pid;events {worker_connections 1024; }http {include mi…