基于Rabbitmq和Redis的延迟消息实现

1 基于Rabbitmq延迟消息实现

支付时间设置为30,未支付的消息会积压在mq中,给mq带来巨大压力。我们可以利用Rabbitmq的延迟队列插件实现消息前一分钟尽快处理
在这里插入图片描述
在这里插入图片描述

1.1定义延迟消息实体

由于我们要多次发送延迟消息,因此需要先定义一个记录消息延迟时间的消息体

@Data
public class MultiDelayMessage<T> {/*** 消息体*/private T data;/*** 记录延迟时间的集合*/private List<Long> delayMillis;public MultiDelayMessage(T data, List<Long> delayMillis) {this.data = data;this.delayMillis = delayMillis;}public static <T> MultiDelayMessage<T> of(T data, Long ... delayMillis){return new MultiDelayMessage<>(data, CollUtils.newArrayList(delayMillis));}/*** 获取并移除下一个延迟时间* @return 队列中的第一个延迟时间*/public Long removeNextDelay(){return delayMillis.remove(0);}/*** 是否还有下一个延迟时间*/public boolean hasNextDelay(){return !delayMillis.isEmpty();}
}

1.2 定义常量,用于记录交换机、队列、RoutingKey等常量

package com.hmall.trade.constants;public interface MqConstants {String DELAY_EXCHANGE = "trade.delay.topic";String DELAY_ORDER_QUEUE = "trade.order.delay.queue";String DELAY_ORDER_ROUTING_KEY = "order.query";
}

1.3 抽取mq配置到nacos中

spring:rabbitmq:host: ${hm.mq.host:192.168.150.101} # 主机名port: ${hm.mq.port:5672} # 端口virtual-host: ${hm.mq.vhost:/hmall} # 虚拟主机username: ${hm.mq.un:hmall} # 用户名password: ${hm.mq.pw:123} # 密码listener:simple:prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

1.4 定义消息处理器

使用延迟消息处理器发送消息
在这里插入图片描述
在这里插入图片描述

1.5 消息监听与延迟消息再次发送

在这里插入图片描述
在这里插入图片描述

2 延迟消息实现

DelayQueue:基于JVM,保存在内存中,会出现消息丢失
在这里插入图片描述

Rabbitmq的延迟任务:基于TTL和死信交换机
在这里插入图片描述

2.1 redis的延迟任务:基于zset的去重和排序功能

在这里插入图片描述
1.为什么任务需要存储在数据库中?
延迟任务是一个通用的服务,任何有延迟需求的任务都可以调用该服务,内存数据库的存储是有限的,需要考虑数据持久化的问题,存储数据库中是一种数据安全的考虑

2.为什么使用redis中的两种数据类型,list和zset?

  • 原因一: list存储立即执行的任务,zset存储未来的数据
  • 原因二:任务量过大以后,zset的性能会下降

时间复杂度:执行时间(次数) 随着数据规模增长的变化趋势

  • 操作redis中的list命令LPUSH: 时间复杂度: O(1)
  • 操作redis中的zset命令zadd: 时间复杂度: (Mlog(n))

2.2 设计mybatis映射实体类:

/*** 版本号,用乐观锁*/@Versionprivate Integer version;乐观锁支持:
/*** mybatis-plus乐观锁支持* @return*/
@Bean
public MybatisPlusInterceptor optimisticLockerInterceptor(){MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();interceptor.addInnerInterceptor(new OptimisticLockerInnerInterceptor());return interceptor;
}

2.3 创建task类,用于接收添加任务的参数

@Data
public class Task implements Serializable {/*** 任务id*/private Long taskId;/*** 类型*/private Integer taskType;/*** 优先级*/private Integer priority;/*** 执行id*/private long executeTime;/*** task参数*/private byte[] parameters;}

2.4 添加任务

2.4.1 添加任务到数据库中

addTaskToDb(task);修改任务表和日志表

@Autowiredprivate TaskinfoMapper taskinfoMapper;@Autowiredprivate TaskinfoLogsMapper taskinfoLogsMapper;/*** 添加任务到数据库中** @param task* @return*/private boolean addTaskToDb(Task task) {boolean flag = false;try {//保存任务表Taskinfo taskinfo = new Taskinfo();BeanUtils.copyProperties(task, taskinfo);taskinfo.setExecuteTime(new Date(task.getExecuteTime()));taskinfoMapper.insert(taskinfo);//设置taskIDtask.setTaskId(taskinfo.getTaskId());//保存任务日志数据TaskinfoLogs taskinfoLogs = new TaskinfoLogs();BeanUtils.copyProperties(taskinfo, taskinfoLogs);taskinfoLogs.setVersion(1);taskinfoLogs.setStatus(ScheduleConstants.SCHEDULED);taskinfoLogsMapper.insert(taskinfoLogs);flag = true;} catch (Exception e) {e.printStackTrace();}return flag;}
2.4.2 添加任务到redis

addTaskToCache(task);判断任务执行之间是否在现在还是未来五分钟内

@Autowiredprivate CacheService cacheService;/*** 把任务添加到redis中** @param task*/private void addTaskToCache(Task task) {String key = task.getTaskType() + "_" + task.getPriority();//获取5分钟之后的时间  毫秒值Calendar calendar = Calendar.getInstance();calendar.add(Calendar.MINUTE, 5);long nextScheduleTime = calendar.getTimeInMillis();//2.1 如果任务的执行时间小于等于当前时间,存入listif (task.getExecuteTime() <= System.currentTimeMillis()) {cacheService.lLeftPush(ScheduleConstants.TOPIC + key, JSON.toJSONString(task));} else if (task.getExecuteTime() <= nextScheduleTime) {//2.2 如果任务的执行时间大于当前时间 && 小于等于预设时间(未来5分钟) 存入zset中cacheService.zAdd(ScheduleConstants.FUTURE + key, JSON.toJSONString(task), task.getExecuteTime());}}

2.5 删除任务

1、删除数据库任务表,更改日志表任务状态
2、删除list或者zset中的任务

在TaskService中添加方法

/*** 取消任务* @param taskId        任务id* @return              取消结果*/
public boolean cancelTask(long taskId);
/*** 取消任务* @param taskId* @return*/
@Override
public boolean cancelTask(long taskId) {boolean flag = false;//删除任务,更新日志Task task = updateDb(taskId,ScheduleConstants.EXECUTED);//删除redis的数据if(task != null){removeTaskFromCache(task);flag = true;}return false;
}/*** 删除redis中的任务数据* @param task*/
private void removeTaskFromCache(Task task) {String key = task.getTaskType()+"_"+task.getPriority();if(task.getExecuteTime()<=System.currentTimeMillis()){cacheService.lRemove(ScheduleConstants.TOPIC+key,0,JSON.toJSONString(task));}else {cacheService.zRemove(ScheduleConstants.FUTURE+key, JSON.toJSONString(task));}
}/*** 删除任务,更新任务日志状态* @param taskId* @param status* @return*/
private Task updateDb(long taskId, int status) {Task task = null;try {//删除任务taskinfoMapper.deleteById(taskId);TaskinfoLogs taskinfoLogs = taskinfoLogsMapper.selectById(taskId);taskinfoLogs.setStatus(status);taskinfoLogsMapper.updateById(taskinfoLogs);task = new Task();BeanUtils.copyProperties(taskinfoLogs,task);task.setExecuteTime(taskinfoLogs.getExecuteTime().getTime());}catch (Exception e){log.error("task cancel exception taskid={}",taskId);}return task;}

2.6 消费任务

1、删除list中的数据
2、使用updateDB删除任务表、跟新日志表

在TaskService中添加方法

/*** 按照类型和优先级来拉取任务* @param type* @param priority* @return*/
public Task poll(int type,int priority);

实现

/*** 按照类型和优先级拉取任务* @return*/
@Override
public Task poll(int type,int priority) {Task task = null;try {String key = type+"_"+priority;String task_json = cacheService.lRightPop(ScheduleConstants.TOPIC + key);if(StringUtils.isNotBlank(task_json)){task = JSON.parseObject(task_json, Task.class);//更新数据库信息updateDb(task.getTaskId(),ScheduleConstants.EXECUTED);}}catch (Exception e){e.printStackTrace();log.error("poll task exception");}return task;
}

2.7 未来定时任务更新-reids管道

减少与redis的交互次数
1、在引导类中添加开启任务调度注解:@EnableScheduling
2、在service中添加定时任务 @Scheduled(cron = “0 */1 * * * ?”),每分钟一次

@Scheduled(cron = "0 */1 * * * ?")
public void refresh() {System.out.println(System.currentTimeMillis() / 1000 + "执行了定时任务");// 获取所有未来数据集合的key值Set<String> futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*");// future_*for (String futureKey : futureKeys) { // future_250_250String topicKey = ScheduleConstants.TOPIC + futureKey.split(ScheduleConstants.FUTURE)[1];//获取该组key下当前需要消费的任务数据Set<String> tasks = cacheService.zRangeByScore(futureKey, 0, System.currentTimeMillis());if (!tasks.isEmpty()) {//将这些任务数据添加到消费者队列中cacheService.refreshWithPipeline(futureKey, topicKey, tasks);System.out.println("成功的将" + futureKey + "下的当前需要执行的任务数据刷新到" + topicKey + "下");}}
}
public List<Object> refreshWithPipeline(String future_key,String topic_key,Collection<String> values){List<Object> objects = stringRedisTemplate.executePipelined(new RedisCallback<Object>() {@Nullable@Overridepublic Object doInRedis(RedisConnection redisConnection) throws DataAccessException {StringRedisConnection stringRedisConnection = (StringRedisConnection)redisConnection;String[] strings = values.toArray(new String[values.size()]);stringRedisConnection.rPush(topic_key,strings);stringRedisConnection.zRem(future_key,strings);return null;}});return objects;}

总结

1、使用rebbitmq使用的场景是在支付和订单微服务中,用于实现消息可以延迟30分钟付款的功能。并借用该中间件的插件实现支付的异步下单功能,并可以快速处理前几分钟,防止消息堆积
2、使用redis是基于zset的去重和排序功能,相当于将一定数据的保存在数据库,使用定时任务同步数据库符合五分钟的任务到zset中,然后,在在zest中定时更新可以运行的任务到list集合中,相当于实现了延迟功能和缓存功能。
3、第二种还可以扩展为将rabbitmq中等待时间较长的数据存到redis中,然后定时的去同步redis中的数据到数据库中,防止消息堆积。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/175173.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

准备搞OpenStack了,先装一台最新的Ubuntu 23.10

正文共&#xff1a;1113 字 25 图&#xff0c;预估阅读时间&#xff1a;2 分钟 依稀记得前面发了一篇Ubuntu的安装文档&#xff08;66%的经验丰富开发者和69%的学生更喜欢的Ubuntu的安装初体验&#xff09;&#xff0c;当时安装的是20.04.3的版本&#xff0c;现在看来已经是非常…

【机器学习】决策树算法理论:算法原理、信息熵、信息增益、预剪枝、后剪枝、算法选择

1. 决策树概念 通过不断的划分条件来进行分类&#xff0c;决策树最关键的是找出那些对结果影响最大的条件&#xff0c;放到前面。 我举个列子来帮助大家理解&#xff0c;我现在给我女儿介绍了一个相亲对象&#xff0c;她根据下面这张决策树图来进行选择。比如年龄是女儿择偶更…

Adobe家里的“3D“建模工 | Dimension

今天&#xff0c;我们来谈谈一款在Adobe系列中比肩C4D的高级3D软件的存在—— Dimension。 Adobe Dimension &#xff0c;其定位是一款与Photoshop以及Illustrator相搭配的3D绘图软件。 Adobe Dimensions与一般的3D绘图软件相较之下&#xff0c;在操作界面在功能上有点不大相同…

保护您的Google账号安全:检查和加固措施

简介&#xff1a;随着我们在日常生活中越来越依赖于Google账号&#xff0c;我们的个人信息和敏感数据也变得越来越容易受到威胁。为了确保您的Google账号的安全性&#xff0c;本文将介绍一些简单但有效的方法&#xff0c;帮助您检查和加固您的Google账号。 --- 在数字时代&am…

【uniapp】确认弹出框,选择确定和取消

代码如下&#xff1a; <view style"display: flex; justify-content: space-around;"><button class"button" click"submit">t提交</button> </view>submit(){let thatthisuni.showModal({title: 提示&#xff1a;,con…

MySQL MVCC机制详解

MySQL MVCC机制详解 MVCC, 是Multi Version Concurrency Control的缩写&#xff0c;其含义是多版本并发控制。这一概念的提出是为了使得MySQL可以实现RC隔离级别和RR隔离级别。 这里回顾一下MySQL的事务&#xff0c; MySQL的隔离级别和各种隔离级别所存在的问题。 事务是由 …

HDP集群Kafka开启SASLPLAINTEXT安全认证

hdp页面修改kafka配置 java代码连接kafka增加对应的认证信息 props.put("security.protocol","SASL_PLAINTEXT");props.put("sasl.mechanism","PLAIN");props.put("sasl.jaas.config","org.apache.kafka.common.securi…

基恩士软件的基本操作(三,监控台调试)

目录 模拟器 模拟器编辑 plc的传输 监控器 在线编辑 程序出现问题时的排查方法 接点跳转功能 交叉参考功能 关系映射功能 程序监控器 登录监控器&#xff08;查看与修改软元件的值&#xff09; 批量监控器 实时时序图 单元监控器 I/O单元监控器 运动单元监控器…

AIGC ChatGPT4 生成Python可视化分析

使用Python进行数据分析,代码可以通过ChatGPT4来完成。 例如Prompt: 产品 销量 P1 48 P2 53 P3 82 P4 57 P5 89 P6 86 P7 30 P8 79 P9 96 将上述数据用Python通过可视化的图表来进行展示 完整代码如下: import matplotlib.pyplot as pltpr…

交换机基础知识之安全配置

交换机在网络基础设施中扮演着重要角色&#xff0c;它促进了设备之间数据包的流动。正因此&#xff0c;采取适当的安全措施来保护网络免受未经授权的访问和潜在攻击至关重要。本文将全面解读交换机基础安全配置知识&#xff0c;并提供实践方案&#xff0c;以保证安全的网络环境…

自定义GPT已经出现,并将影响人工智能的一切,做好被挑战的准备了吗?

原创 | 文 BFT机器人 OpenAI凭借最新突破&#xff1a;定制GPT站在创新的最前沿。预示着个性化数字协助的新时代到来&#xff0c;ChatGPT以前所未有的精度来满足个人需求和专业需求。 从本质上讲&#xff0c;自定义GPT是之前的ChatGPT的高度专业化版本或代理&#xff0c;但自定…

单片机与PLC的区别有哪些?

单片机与PLC的区别有哪些? 什么是单片机&#xff1f; 单片机&#xff08;Microcontroller&#xff0c;缩写MCU&#xff09;是一种集成了中央处理器&#xff08;CPU&#xff09;、存储器和输入/输出接口等功能模块的微型计算机系统。它通常被用于嵌入式系统和控制系统中&#x…