redisson的延时队列机制简述

概述

业务中经常会遇到一些延迟执行的需求;通常想到的都是rabbitmq或者rocketmq的延迟消息;
但是系统中不一定集成了mq,但为了控制分布式下的并发,一般redis都是有集成的;
rediskey过期监听那个时间不准确,在集群环境下节点挂了也容易丢失;

那么用redisson的延迟队列,正好可以用来解决轻量级的延时消息;
简单的来说就是消费者生产了一个消息任务,塞到ZSet里(用当前时间戳+延迟时间作为分数),等时间到了,就会放到任务List中,然后消费者真正去执行任务都是从任务List中获取任务;

redisson中的消费者并不是一直轮询获取任务;而是有具体时间的延迟任务,时间到了去任务队列中获取任务;

redisson延时任务机制简述

生产者先将任务pushdelay_queue_timeout等待队列中,延迟时间到了,消费者会把任务从timeout队列挪到SANYOU任务队列中(消费者实际获取任务的队列),然后消费者就能拿到最终要执行的任务了;

这里具体要说的就是客户端通知和获取机制;
消费者在启动时通常都会去get一下队列,达到订阅队列的目的;

RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue("SANYOU");
RDelayedQueue<String> delayQueue = redissonClient.getDelayedQueue(blockingQueue);

这样做的目的:
消费者订阅队列,从delay_queue_timeout等待延迟队列中将已经到达时间的任务挪到真正的任务List队列中,然后再将delay_queue_timeout队列中第一个(也就是第一个要执行的)的任务的时间拿到,用这个时间开启一个延迟任务,时间到了之后,会发布一个消息到时间通知channel中;然后客户端监听到这个channel中的消息后,会再次重复上述步骤,让delay_queue_timeout中的任务,可以都放到真正的任务List队列中;

这样有一个好处就是不用一直while扫描等待,客户端的延迟任务时间和delay_queue_timeout中的延迟时间是一样的,可以精准利用cpu,理论上是没有延迟的,但是实际消息数量大量增加,消费者消费比较慢,还是会造成延迟任务消费延迟;

另外由于客户端都是用lua脚本去redis的同一个List队列中获取任务,lua脚本在redis中都是原子任务,而且redis真正的操作是单线程的,所以不会存在任务广播情况(并发获取时,一个任务不会被多个消费者同时拿到);

捞一张图片
在这里插入图片描述

代码Demo


import cn.hutool.extra.spring.SpringUtil;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBlockingDeque;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;@Slf4j
@Component
public class RedissonDelayQueueConfig implements InitializingBean {@Resourceprivate RedissonClient redissonClient;//延时队列mapprivate final Map<String, RDelayedQueue<DelayMessageDTO>> delayQueueMap = new ConcurrentHashMap<>(16);/*** 消费者初始化所有队列,订阅对应的队列,并开启第一个过期任务的过期时间对应的延迟任务*/@PostConstructpublic void reScheduleDelayedTasks() {DelayQueueEnum[] queueEnums = DelayQueueEnum.values();for (DelayQueueEnum queueEnum : queueEnums) {RBlockingDeque<Object> blockingDeque = redissonClient.getBlockingDeque(queueEnum.getCode());RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);}}@Overridepublic void afterPropertiesSet() {// 有新的延迟队列在这里添加,队列消费类需要继承DelayQueueConsumerDelayQueueEnum[] queueEnums = DelayQueueEnum.values();for (DelayQueueEnum queueEnum : queueEnums) {DelayQueueConsumer delayQueueConsumer = SpringUtil.getBean(queueEnum.getBeanName());if (delayQueueConsumer == null) {throw new ServiceException("queueName=" + queueEnum.getBeanName() + ",delayQueueConsumer=null,请检查配置...");}// Redisson的延时队列是对另一个队列的再包装,使用时要先将延时消息添加到延时队列中,当延时队列中的消息达到设定的延时时间后,// 该延时消息才会进行进入到被包装队列中,因此,我们只需要对被包装队列进行监听即可。RBlockingQueue<DelayMessageDTO> rBlockingQueue = redissonClient.getBlockingDeque(queueEnum.getCode());//消费者初始化队列RDelayedQueue<DelayMessageDTO> rDelayedQueue = redissonClient.getDelayedQueue(rBlockingQueue);//set到map中方便获取delayQueueMap.put(queueEnum.getCode(), rDelayedQueue);// 订阅新元素的到来,调用的是takeAsync(),异步执行rBlockingQueue.subscribeOnElements(delayQueueConsumer::execute);}}public RedissonClient getRedissonClient() {return redissonClient;}public Map<String, RDelayedQueue<DelayMessageDTO>> getDelayQueueMap() {return delayQueueMap;}
}import cn.hutool.core.date.DateUtil;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBlockingDeque;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;import javax.annotation.Resource;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;@Slf4j
@Component
public class DelayQueueUtil {private static RedissonDelayQueueConfig redissonDelayQueueConfig;@Resourcepublic void setRedissonDelayQueueConfig(RedissonDelayQueueConfig redissonDelayQueueConfig) {DelayQueueUtil.redissonDelayQueueConfig = redissonDelayQueueConfig;}private static Map<String, RDelayedQueue<DelayMessageDTO>> getDelayQueueMap() {if(null == redissonDelayQueueConfig) return Collections.emptyMap();return redissonDelayQueueConfig.getDelayQueueMap();}private static RedissonClient getRedissonClient() {if(null == redissonDelayQueueConfig) return null;return redissonDelayQueueConfig.getRedissonClient();}/*** 添加延迟消息*/public static void addDelayMessage(DelayMessageDTO delayMessage) {log.info("delayMessage={}", delayMessage);Assert.isTrue(getDelayQueueMap().containsKey(delayMessage.getQueueName()), "队列不存在");delayMessage.setCreateTime(DateUtil.now());if(null == delayMessage.getTimeUnit()){delayMessage.setTimeUnit(TimeUnit.SECONDS);}RDelayedQueue<DelayMessageDTO> rDelayedQueue = getDelayQueueMap().get(delayMessage.getQueueName());//移除相同的消息rDelayedQueue.remove(delayMessage);//添加消息rDelayedQueue.offer(delayMessage, delayMessage.getDelayTime(), delayMessage.getTimeUnit());}/*** 移除指定队列中的消息*/public static void removeDelayMessage(DelayMessageDTO delayMessage) {log.info("取消:delayMessage={}", delayMessage);if (!getDelayQueueMap().containsKey(delayMessage.getQueueName())) {log.error("queueName={},该延迟队列不存在,请确认后再试...", delayMessage.getQueueName());return;}RDelayedQueue<DelayMessageDTO> rDelayedQueue = getDelayQueueMap().get(delayMessage.getQueueName());rDelayedQueue.remove(delayMessage);removeDelayQueue(delayMessage);}/*** 从所有队列中删除消息*/public static void removeDelayQueue(DelayMessageDTO value) {DelayQueueEnum[] queueEnums = DelayQueueEnum.values();for (DelayQueueEnum queueEnum : queueEnums) {RBlockingDeque<Object> blockingDeque = getRedissonClient().getBlockingDeque(queueEnum.getCode());RDelayedQueue<Object> delayedQueue = getRedissonClient().getDelayedQueue(blockingDeque);delayedQueue.remove(value);}}}

参考了大佬的博文
https://lhalcyon.com/delay-task/index.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/416097.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

在 wsl-ubuntu 里通过 docker 启动 gpu-jupyter

在 wsl-ubuntu 里通过 docker 启动 gpu-jupyter 0. 背景1. 安装 docker-ce2. 安装 NVIDIA Container Toolkit3. 使用 nvidia-ctk 命令配置容器运行4. 通过 docker 运行 nvidia-smi5. 运行 gpu-jupyter6. 访问 gpu-jupyter7. 测试 gpu-jupyter 是否可以访问 cuda 0. 背景 今天突…

Mysql详细安装步骤

Linux 安装 MySQL【超详细版】 ​编辑 我叫BuGu    2023-05-11 16:48:10 发布 一、安装 MySQL 的准备工作 1. 查看系统版本 cat /etc/redhat-release2. 查看系统是否已经安装过 MySQL 查看是否安装了 MySQL rpm -qa | grep mysql查看是否有安装 mariadb,该软件与 MySQ…

qt学习:进度条,水平滑动条,垂直滑动条+rgb调试实战

目录 水平滑动条&#xff0c;垂直滑动条 常用信号 进度条 常用信号 修改进度条 例子 rgb调色 配置ui界面 编写3个进度条的事件函数 添加链表容器和按钮索引 在.h里的类定义 初始化链表容器和按钮索引 编写添加颜色的按钮点击事件函数 效果 水平滑动条&#xff0c…

ChatGPT正确打开方式与GPT-4.5的key最新获取方式

前言 前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家&#xff1a;https://www.captainbed.cn/z ChatGPT体验地址 文章目录 前言4.5key价格泄漏ChatGPT4.0使用地址ChatGPT正确打开方式最新功能语音助手存档…

【REMB 】翻译:草案remb-03

REMB REMB消息 以及 绝对时间戳选项 在带宽估计中的使用 :an absolute-value timestamp option for use in bandwidth estimatoin. 接收方带宽估计的RTCP消息 REMB 这位大神翻译的更好。 RTCP message for Receiver Estimated Maximum Bitrate draft-alvestrand-rmcat-remb-03…

.NetCore Flurl.Http 4.0.0 以上管理客户端

参考原文地址&#xff1a;Managing Clients - Flurl 管理客户端 Flurl.Http 构建在堆栈之上System.Net.Http。如果您熟悉HttpClient&#xff0c;那么您可能听说过这个建议&#xff1a;不要为每个请求创建一个新客户端&#xff1b;重复使用它们&#xff0c;否则将面临后…

自动化测试框架有哪些?

前言 自动化测试常用的Python框架有哪些&#xff1f;常用的框架有Robot Framework、Pytest、UnitTest/PyUnit、Behave、Lettuce。Pytest、Robot Framework和UnitTest主要用于功能与单元测试&#xff0c;Lettuce和Behave仅适用于行为驱动测试。 一、Robot Framework Python测…

6.4.2转换文件

6.4.2转换文件 利用Swf2VideoConverter2可以很方便地将Flash动画(*.swf)转换为其它的视频格式。 1&#xff0e;单击“添加”按钮&#xff0c;在弹出的下拉菜单中选择“添加文件”&#xff0c;在弹出的“Open Swf Files(打开Swf文件)”窗口中选择swf文件(如&#xff1a;那些花…

ArcGIS Pro中怎么加载在线地图

当我们在制图的时候&#xff0c;有的时候需要加载在线地图&#xff0c;在ArcGIS Pro中加载在线地图的方式有很多&#xff0c;这里为大家介绍一下加载的方法&#xff0c;希望能对你有所帮助。 加载底图 在菜单栏上选择地图&#xff0c;点击底图&#xff0c;可以看到所有可加载…

网络爬虫采集工具

在当今数字化的时代&#xff0c;获取海量数据对于企业、学术界和个人都至关重要。网络爬虫成为一种强大的工具&#xff0c;能够从互联网上抓取并提取所需的信息。本文将专心分享关于网络爬虫采集数据的全面指南&#xff0c;深入探讨其原理、应用场景以及使用过程中可能遇到的挑…

Docker 安装 MySQ

Docker 安装 MySQL MySQL 是世界上最受欢迎的开源数据库。凭借其可靠性、易用性和性能&#xff0c;MySQL 已成为 Web 应用程序的数据库优先选择。 1、查看可用的 MySQL 版本 访问 MySQL 镜像库地址&#xff1a;https://hub.docker.com/_/mysql?tabtags 。 可以通过 Sort b…

多示例学习 (multi-instance learning, MIL) 学习路线 (归类、重点文章列举、持续更新)

文章目录 0 要点1 多示例背景介绍2 理论MIL概述2.1 传统MIL方法2.2 注意力网络2.3 对比学习2.4 介入学习2.5 强化学习2.6 GAN 3 应用MIL概述3.1 全幻灯片分类3.2 视频异常检测3.3 图像分类3.4 调制识别3.4 Benchmark 4 MIL交叉领域4.1 多示例多标签4.2 多示例偏标签4.3 多示例分…