Redis:原理速成+项目实战——Redis实战10(Redis消息队列实现异步秒杀)

👨‍🎓作者简介:一位大四、研0学生,正在努力准备大四暑假的实习
🌌上期文章:Redis:原理速成+项目实战——Redis实战9(秒杀优化)
📚订阅专栏:Redis:原理速成+项目实战
希望文章对你们有所帮助

上一节已经实现了异步秒杀,也就是将秒杀分为两个环节:
1、判断是否有抢单资格(库存量是否充足、是否满足一人一单)、
2、下单操作(优惠券表中的库存量-1,订单表增加相应信息)
其中,第一步的操作放在了Redis中,可以有效提高效率,而真正大幅度提高效率的点还是因为我们将下单的操作交给了另一个开辟的线程,因为对数据库的操作并不需要什么时效性。

异步执行所需要的信息被封装并保存到了阻塞队列中,上一节分析了这会造成的问题:
1、内存限制问题
2、数据安全问题

消息队列可以解决这个问题,一般建议用专业的消息中间件来使用,最主流的当然就是RabbitMQ了,但是这边也讲解一下用Redis里面的一些数据结构来模拟出消息队列的效果,实现的话我感觉也挺容易的,只演示基于Stream消息队列实现异步秒杀。

Redis消息队列实现异步秒杀

  • 认识消息队列
  • 基于List实现消息队列
  • PubSub实现消息队列
  • Stream的单消费模式
  • Stream的消费者组模式
  • 基于Stream消息队列实现异步秒杀

认识消息队列

消息队列,也就是存放消息的队列,最简单的消息队列包括3个角色:
(1)消息队列(代理):存储和管理信息
(2)生产者:发消息到消息队列
(3)消费者:从消息队列中获取消息并处理
因此,异步秒杀的思路为:
在这里插入图片描述

这个思路与上一节用阻塞队列的思路是差不多的,但是有2点重要区别:
1、消息队列是JVM以外的独立服务,不受JVM内存的限制
2、消息队列不仅仅做数据存储,还确保了数据安全,存到消息队列中的消息会做持久化处理,并要求消费者要做出消息的确认,否则会持续将消息传递给消费者,确保消息至少被“签收”一次

基于List实现消息队列

List是一种双向链表,很容易模拟出队列。
需要注意的是,当消息队列中没有消息的时候,我们应当要让线程等待,而不是直接返回Null,因此这儿要用BRPOPBLPOP来实现阻塞效果(B表示阻塞)

优点:
(1)利用Redis存储,不受限于JVM内存上限
(2)基于Redis的持久化机制,保证数据安全性
(3)满足消息有序性
缺点:
(1)无法避免消息丢失(消息会从队列直接移除)
(2)只支持单消费者

PubSub实现消息队列

PubSub(发布订阅)是Redis2.0引入的,消费者可以订阅一个或多个channel(频道),生产者向对应channel发送消息后,所有订阅者都能收到相关消息。

优点:采用发布订阅模型,支持多生产、多消费
缺点:
(1)不支持数据持久化
(2)无法避免消息丢失
(3)消息堆积有上线,超出时数据丢失

Stream的单消费模式

Stream是Redis5.0引入的一种新数据类型,可以实现功能完善的消息队列。
在这里插入图片描述
例如:
在这里插入图片描述
读取消息:XREAD
在这里插入图片描述
例如,用XREAD读第一个消息:

XREAD COUNT 1 STREAMS users 0

用XREAD阻塞方式读取最新消息:

XREAD COUNT 1 BLOCK STREAMS users $

所以,在开发的时候,可以循环调用XREAD阻塞方式来查询最新消息,从而实现持久监听队列。
但是,当指定起始ID为$读取最新消息,处理一条消息的过程中,又有超过1条以上的消息到达队列,则下次获取也只能获取到最新的一条,会出现消息漏读

特点:
(1)消息可回溯
(2)一个消息可以被多个消费者读取
(3)可以阻塞读取
(4)有消息漏读的风险

Stream的消费者组模式

这一部分命令还是麻烦了,理解就行,要使用就去看文档就好了。

消费者组可以解决消息漏读的问题。
消费者组:将多个消费者划分到一个组中,监听同一个队列。

特点:
1、消息分流:队列中的消息会分流给组内不同消费者,而不是重复消费,从而加快消息处理速度
2、消息标识:消费者组会维护一个标识,记录最后一个被处理的消息,哪怕消费者宕机重启,还会从标识之后去读取消息,确保了每一个消息都会被消费
3、消息确认:消费者获取消息后,消息处于pending状态,存入pending-list,当处理完成后需要通过XACK来确认消息,标记消息为已处理,才会从pending-list中移出,可以解决消息丢失的问题

创建消费者组:
XGROUP CREATE key groupName ID [MKSTREAM]
删除指定消费者组
XGROUP DESTROY key groupName
给指定的消费者组添加消费者
XGROUP CREATECONSUMER key groupName consumerName
删除消费者组中的指定消费者
XGROUP DELCONSUMER key groupName consumerName

从消费者组读取消息:

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key ID
其中,ID表示获取消息的起始ID:
(1)“>”:从下一个未消费的消息开始
(2)其它:根据指定id从pending-list中获取已消费但未确认的消息,例如0,是从pending-list中第一个消息开始

基于Stream消费者组,我们利用消费者监听消息的基本思路:
1、使用阻塞模式尝试监听队列,没消息就继续监听,有消息就开线程处理消息,并在完成后ACK。
2、若没有成功ACK,抛出异常,那么消息就会留在padding-list中,这时候就需要读取padding-list获取异常消息并处理。

STREAM类型消息队列的XREADGROUP命令特点:

1、消息可回溯
2、可以多消费者争抢消息,加快消费速度
3、可以阻塞读取
4、没有消息漏读的风险
5、有消息确认机制,保证消息至少被消费一次

基于Stream消息队列实现异步秒杀

1、创建Stream类型的消息队列stream.orders和消费者组:

XGROUP CREATE stream.orders g1 0 MKSTREAM # 组名g1,起始位置为0

在这里插入图片描述
2、修改之前秒杀下单的Lua脚本,在认定有抢购资格后,直接向stream.orders中添加消息,内容包括voucherId、userId、orderId:

-- 1 参数列表
-- 1.1 优惠券id
local voucherId = ARGV[1]
-- 1.2 用户id
local userId = ARGV[2]
-- 1.3 订单id
local orderId = ARGV[3]-- 2 数据key
-- 2.1 库存key
local stockKey = 'seckill:stock:' .. voucherId
-- 2.2 订单key
local orderKey = 'seckill:order:' .. voucherId-- 3 脚本业务
-- 3.1 判断库存是够充足
if(tonumber(redis.call('get', stockKey)) <= 0) then-- 3.2 库存不足,返回1return 1
end
-- 3.2 判断用户是否下单,即判断用户id是不是这个set集合的成员
if(redis.call('sismember', orderKey, userId) == 1) then-- 3.2 存在,说明重复下单return 2
end
-- 3.4 扣库存
redis.call('incrby', stockKey, -1)
-- 3.5 下单(保存用户)
redis.call('sadd', orderKey, userId)
-- 3.6 发送消息到队列中,orderId的key指定为Id更好,因为订单实体类是这么定义的
redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'Id', orderId)
return 0

与上次的代码相比,我们多增加了一个参数,所以我们要修改一下函数的调用:
在这里插入图片描述
这个参数的增加,在后续的编写中会省去一些麻烦。

3、项目启动时,开启一个线程任务,尝试获取stream.orders中的消息,完成下单,整体的业务流程的代码如下:

@Slf4j
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {//注入秒杀优惠券的service@Resourceprivate ISeckillVoucherService seckillVoucherService;@Resourceprivate RedisIdWorker redisIdWorker;@Resourceprivate RedissonClient redissonClient;@Resourceprivate StringRedisTemplate stringRedisTemplate;public static final DefaultRedisScript<Long> SECKILL_SCRIPT;static {SECKILL_SCRIPT = new DefaultRedisScript<>();SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));SECKILL_SCRIPT.setResultType(Long.class);}//线程池private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();//线程任务,用户随时都能抢单,所以应该要在这个类被初始化的时候马上开始执行@PostConstruct  //该注解表示在当前类初始化完毕以后立即执行private void init(){SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());}private void handleVoucherOrder(VoucherOrder voucherOrder) {//获取用户,用户id不能再从UserHolder中取了,因为现在是从线程池获取的全新线程,不是主线程Long userId = voucherOrder.getUserId();//创建锁对象RLock lock = redissonClient.getLock("lock:order:" + userId);//获取锁boolean isLock = lock.tryLock();//判断是否获取锁成功if(!isLock){log.error("不允许重复下单");//理论上不会发生}try {proxy.createVoucherOrder(voucherOrder);} finally {lock.unlock();}}IVoucherOrderService proxy;private class VoucherOrderHandler implements Runnable{String queueName = "stream.orders";@Overridepublic void run() {while (true){try {//获取消息队列中的订单信息,XREADGROUP GROUP g1 c1 BLOCK 2000 STREAMS stream.ordersList<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create(queueName, ReadOffset.lastConsumed()));//判断消息获取是否成功if(list == null || list.isEmpty()) {//获取失败,说明没有消息,继续下一次循环continue;}//解析消息中的订单信息MapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> values = record.getValue();//将其转变为VoucherOrder对象,忽略异常VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);//获取成功,下单handleVoucherOrder(voucherOrder);//ACK确认stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());} catch (Exception e) {//异常则表示没有被ACK确认,剩下的操作都是针对pending-list的log.error("处理订单异常", e);handlePendingList();}}}private void handlePendingList() {while (true){try {//获取pending-list中的订单信息 XREADGROUP g1 c1 COUNT 1 STREAMS stream.orders 0List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"), //消费者信息StreamReadOptions.empty().count(1),StreamOffset.create(queueName, ReadOffset.from("0")));if(list == null || list.isEmpty()) {//获取失败,说明没有消息,结束循环break;}//解析消息中的订单信息MapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> values = record.getValue();//将其转变为VoucherOrder对象,忽略异常VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);//获取成功,下单handleVoucherOrder(voucherOrder);//ACK确认stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());} catch (Exception e) {log.error("处理pending-list订单异常", e);}}}}//秒杀优化,调用Lua的代码@Overridepublic Result seckillVoucher(Long voucherId) {//获取用户Long userId = UserHolder.getUser().getId();//获取订单idlong orderId = redisIdWorker.nextId("order");//执行Lua脚本Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),voucherId.toString(), userId.toString(), String.valueOf(orderId));//判断结果是否为0int r = result.intValue();if(r != 0){//不为0,没有购买资格return Result.fail(r == 1 ? "库存不足" : "不能重复下单");}//获取代理对象proxy = (IVoucherOrderService) AopContext.currentProxy();//返回订单idreturn Result.ok(orderId);}@Transactional(rollbackFor = Exception.class)public void createVoucherOrder(VoucherOrder voucherOrder) {Long userId = voucherOrder.getUserId();Long voucherId = voucherOrder.getVoucherId();//查询订单int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();//判断是否存在if (count > 0) {log.error("不可重复购买");}//扣减库存boolean success = seckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherId).gt("stock", 0).update();if (!success) {log.error("库存不足");}//保存订单this.save(voucherOrder);}
}

我觉得真的还是太麻烦了。。。而且我遇到了很多次bug,反正都跟线程池有关系,自己修改bug的能力一般,耽误了不少时间,这方面能力要提高。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/340446.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

从零学Java 集合概述

Java 集合概述 文章目录 Java 集合概述1 什么是集合?2 Collection体系集合2.1 Collection父接口2.1.1 常用方法2.1.2 Iterator 接口 1 什么是集合? 概念&#xff1a;对象的容器&#xff0c;定义了对多个对象进行操作的常用方法&#xff1b;可实现数组的功能。 和数组区别&…

MES系统数据采集的几种方式

生产制造执行MES系统具有能够帮助企业实现生产数据收集与分析、生产计划管理、生产过程监控等的功能板块&#xff0c;在这里小编就不一一介绍了&#xff0c;主要讲讲它的数据采集功能板块&#xff0c;可以说&#xff0c;数据采集是该系统进行数据统计与生产管理等后续工作的基础…

ELF文件格式以及交叉编译工具链常用工具

windows下的可执行文件是.exe文件. .data段放置的是初始化的全局变量和初始化的静态局部变量&#xff1b; .bss段放置的是未初始化的全局变量和未初始化的静态局部变量&#xff1b;因为未初始化的变量会放在.bss段中统一置0. .text段放语句&#xff0c;比如a; 局部变量则是…

uniapp打包h5部署到服务器

在学习uniapp&#xff0c;部署前后端分离项目。将h5的dist文件打包好后一直在考虑如何通过nginx反向代理到后端接口&#xff0c;整了半天也没整成。最后才发现&#xff0c;uniapp打包的h5页面包好像不需要反向代理到后端接口&#xff0c;只需要通过nginx将dist下的h5包代理了&a…

掌握Sketch:软件介绍与实用技巧分享

Sketch是最好的UI软件之一。它可以快速交互迭代&#xff0c;每个页面之间的小部件可以直接复制粘贴并修改。在整体架构布局中&#xff0c;可以直接下载很多Mocaup模板&#xff0c;所以非常快。这个工具完全是为应用程序设计的&#xff0c;比PS好得多。 如果你不知道sketch软件…

谷粒学院项目redirect_uri 参数错误微信二维码登录

谷粒学院项目redirect_uri 参数错误_redirect_uri": "http%3a%2f%2fguli.shop%2fapi%2fuce-CSDN博客 修改本地配置 # &#xfffd;&#xfffd;&#xfffd;&#xfffd;˿&#xfffd; server.port8160 # &#xfffd;&#xfffd;&#xfffd;&#xfffd;&#x…

VBA中类的解读及应用第八讲:实现定时器功能的自定义类事件

《VBA中类的解读及应用》教程【10165646】是我推出的第五套教程&#xff0c;目前已经是第一版修订了。这套教程定位于最高级&#xff0c;是学完初级&#xff0c;中级后的教程。 类&#xff0c;是非常抽象的&#xff0c;更具研究的价值。随着我们学习、应用VBA的深入&#xff0…

机器学习_8、支持向量机

支持向量机解决鸢尾花数据集分类问题 # 导入鸢尾花数据集 from sklearn.datasets import load_iris import pandas as pd import numpy as npiris_data load_iris() Xiris_data.data yiris_data.target# 划分训练集与测试集 from sklearn.model_selection import train_test_…

软件测试|使用Pytest、Allure Step和Allure Attach创建详细测试报告

引言 在软件开发过程中&#xff0c;测试是不可或缺的一部分。为了更好地展示测试结果并定位问题&#xff0c;结合Pytest测试框架和Allure测试报告工具可以创建清晰、详细的测试报告。本文将介绍如何使用Pytest、Allure的allure.step()和allure.attach()功能来创建具有丰富信息…

大龄码农的业余作品:升讯威在线客服系统:系统架构设计

本系列文章详细介绍使用 .net core 和 WPF 开发 升讯威在线客服与营销系统 的过程。本产品已经成熟稳定并投入商用&#xff0c;并提供了多国语言版本&#xff0c;服务了一些海外客户。 本篇主要介绍系统的技术架构&#xff0c;从较高的抽象层次上解释我是怎样设计实现这样一套…

蓝桥杯练习题(二)

&#x1f4d1;前言 本文主要是【算法】——蓝桥杯练习题&#xff08;二&#xff09;的文章&#xff0c;如果有什么需要改进的地方还请大佬指出⛺️ &#x1f3ac;作者简介&#xff1a;大家好&#xff0c;我是听风与他&#x1f947; ☁️博客首页&#xff1a;CSDN主页听风与他 …

C#编程-实现多线程

实现多线程 多线程帮助同时执行各种操作。这为用户节省时间。多线程程序包括一个主线程和其他用户定义的线程以同时执行多个任务。 微处理器为执行的进程分配内存。每个进程占有内存中它们自己的地址空间。但是,所有在进程中的线程占有相同的地址空间。多线程允许在一个程序…