redission-delay-queue使用及原理

news/2025/2/21 16:50:42/文章来源:https://www.cnblogs.com/gmt-hao/p/18729524

 

1.引入jar包

<dependency><groupId>xxxx.delay</groupId><artifactId>redisson-delayed-queue</artifactId><version>1.0.0-SNAPSHOT</version>
</dependency>

 

2.客户端代码开发

2.1:新增实现BaseTask的bean 参数由业务自行决定

public class TestDto extends BaseTask {private String aa1 = "aa1";private String aa2 = "aa2";;private String aa3 = "aa3";;private String aa4 = "aa4";;private String aa5 = "aa5";;private String aa6 = "aa6";;private String aa7 = "aa7";;
}

2.2客户端发送延时任务

TestDto testDto = new TestDto();
//todo构建参数
redisDelayQueue.addQueue("test1", testDto, 5, TimeUnit.SECONDS);

 

3.服务端代码开发 加上@RedisDelayListener注解接收即可(租户已自动传递,无需关心)

@Component
public class TestDelayListener {@RedisDelayListener(topic = "test1")public void test(TestDto testDto) {System.out.println("收到test1的消息"+ JSON.toJSONString(testDto));}
}

上面我们知道如何使用,很简单,只需要调用addQueue然后定义接收方法并加上注解就可以进行业务开发了,这里再看下如何实现的。

流程图:https://www.processon.com/v/65af2dfd7fce4a25f8dd239f

 

 

核心实现是对一个channel一个zset两个list的操作。

首先在初始化的时候,我们会加载所有被RedisDelayListener的方法并初始化redis延时队列引擎。代码如下:

public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {if (!this.nonAnnotatedClasses.contains(bean.getClass())) {Class<?> targetClass = AopUtils.getTargetClass(bean);Map<Method, Set<RedisDelayListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,(MethodIntrospector.MetadataLookup<Set<RedisDelayListener>>) method -> {Set<RedisDelayListener> listenerMethods = findListenerAnnotations(method);return (!listenerMethods.isEmpty() ? listenerMethods : null);});if (annotatedMethods.isEmpty()) {this.nonAnnotatedClasses.add(bean.getClass());//            log.info("当前class类型:{}无redisDelayListener注解 " + bean.getClass());} else {for (Method method : annotatedMethods.keySet()) {RedisDelayListener customAnnotation = method.getAnnotation(RedisDelayListener.class);if (customAnnotation != null) {String topic = customAnnotation.topic();//不允许有重复topic配置int retryTimes = customAnnotation.retryTime();long reTryDelay = customAnnotation.reTryDelay();Class<?> parameterType = method.getParameterTypes()[0];if(!BaseTask.class.isAssignableFrom(parameterType)) {throw new ClassCastException("redis延时任务对象:"+parameterType.getName()+"必须实现BaseTask");}RedisDelayEngine redisDelayEngine = new RedisDelayEngine(redisDelayQueue,topic, method, parameterType);redisDelayEngine.setTryTimes(retryTimes);redisDelayEngine.setReTryDelay(reTryDelay);}}}}return bean;
}

这里在bean初始化后触发,会校验当前bean是否有redisDelayListener注解,有则初始化RedisDelayEngine:

public RedisDelayEngine(RedisDelayBlockingQueue redisDelayQueue, String topic, Method method, Class<T> classType) {this.redisDelayQueue = redisDelayQueue;this.topic = topic;this.classType = classType;this.method = method;// 阻塞队列和延时队列初始化redisDelayQueue.getDelayQueue(topic);//启动延时线程ThreadPools.getInstance().execute(new DelayThread());
}

这里会触发阻塞队列和延时队列初始化,然后启动消费消息的任务,先看队列初始化:

RDelayedQueue<String> getDelayQueue(String queueName) {return delayedQueueMap.computeIfAbsent(queueName, k -> {RBlockingQueue<String> blockingQueue = blockingQueueMap.computeIfAbsent(queueName,t -> redissonClient.getBlockingQueue(queueName));return redissonClient.getDelayedQueue(blockingQueue);});
}

 

public <V> RDelayedQueue<V> getDelayedQueue(RQueue<V> destinationQueue) {if (destinationQueue == null) {throw new NullPointerException();} else {return new RedissonDelayedQueue(this.queueTransferService, destinationQueue.getCodec(), this.commandExecutor, destinationQueue.getName());}
}

 

protected RedissonDelayedQueue(QueueTransferService queueTransferService, Codec codec, final CommandAsyncExecutor commandExecutor, String name) {super(codec, commandExecutor, name);//延时队列核心代码QueueTransferTask task = new QueueTransferTask(commandExecutor.getConnectionManager()) {protected RFuture<Long> pushTaskAsync() {return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG,"local expiredValues = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); "//拿到zset中过期的值列表+ "if #expiredValues > 0 then " //如果有+ "for i, v in ipairs(expiredValues) do "+ "local randomId, value = struct.unpack('dLc0', v);"//解构消息,在提交任务时打包的消息+ "redis.call('rpush', KEYS[1], value);" //放入无前缀的list 队头+ "redis.call('lrem', KEYS[3], 1, v);"//移除带前缀list 队尾元素+ "end; "+ "redis.call('zrem', KEYS[2], unpack(expiredValues));" //移除zset中本次读取的过期元素+ "end; "// get startTime from scheduler queue head task+ "local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); "//取zset最小分值的元素+ "if v[1] ~= nil then "+ "return v[2]; " //返回分值,即过期时间+ "end "+ "return nil;",Arrays.asList(getRawName(), timeoutSetName, queueName),System.currentTimeMillis(), 100);}protected RTopic getTopic() {return new RedissonTopic(LongCodec.INSTANCE, commandExecutor, RedissonDelayedQueue.this.channelName);}};queueTransferService.schedule(this.queueName, task);this.queueTransferService = queueTransferService;
}

这里主要看下RedissonDelayedQueue初始化部分,主要新建了一个task,这个task即上图中的pushTask,是redission实现延时队列的核心部分,task提供了两个方法pushTaskAsync和getTopic,getTopic方法很简单,只是新建了一个topic名称为redisson_delay_queue_channel的对象,这个下面会用到,主要先看下pushTaskAsync的lua脚本做了什么:

1.拿到zset(redisson_delay_queue_timeout)中的已过期的值(通过score大小筛选,最多前100条)

2.如果本次有已过期的任务则遍历放入无前缀的list中(无前缀的list即是我们后面获取任务的来源),并从有前缀的list中删除

3.从zset中删除查询出的任务

4.如果zset中还有任务,则返回该任务的过期时间(本地需要这个新建定时任务,到期继续触发这个流程)

再看schedule方法:

public synchronized void schedule(String name, QueueTransferTask task) {QueueTransferTask oldTask = tasks.putIfAbsent(name, task);if (oldTask == null) {task.start();} else {oldTask.incUsage();}
}

 

public void start() {RTopic schedulerTopic = getTopic();statusListenerId = schedulerTopic.addListener(new BaseStatusListener() {@Overridepublic void onSubscribe(String channel) {pushTask();}});messageListenerId = schedulerTopic.addListener(Long.class, new MessageListener<Long>() {@Overridepublic void onMessage(CharSequence channel, Long startTime) {scheduleTask(startTime);}});
}

上面的getTopic方法即调用之前任务定义的getTopic(就是上面那个channel)

这里注册监听onSubScribe(订阅时触发)onMessage(新消息时触发)

看pushTask:

private void pushTask() {RFuture<Long> startTimeFuture = pushTaskAsync();startTimeFuture.whenComplete((res, e) -> {if (e != null) {if (e instanceof RedissonShutdownException) {return;}log.error(e.getMessage(), e);scheduleTask(System.currentTimeMillis() + 5 * 1000L);return;}if (res != null) {scheduleTask(res);}});
}

这里首先会调用pushTaskAsync方法,其实就是上面说的lua脚本触发的地方

前面说到,如果还有新的延时任务,会返回最新的一条的到期时间,

然后调用scheduleTask方法:

private void scheduleTask(final Long startTime) {TimeoutTask oldTimeout = lastTimeout.get();if (startTime == null) {return;}if (oldTimeout != null) {oldTimeout.getTask().cancel();}long delay = startTime - System.currentTimeMillis();if (delay > 10) {Timeout timeout = connectionManager.newTimeout(new TimerTask() {                    @Overridepublic void run(Timeout timeout) throws Exception {pushTask();TimeoutTask currentTimeout = lastTimeout.get();if (currentTimeout.getTask() == timeout) {lastTimeout.compareAndSet(currentTimeout, null);}}}, delay, TimeUnit.MILLISECONDS);if (!lastTimeout.compareAndSet(oldTimeout, new TimeoutTask(startTime, timeout))) {timeout.cancel();}} else {pushTask();}
}

如果没到期的任务都会走到这里,如果有老任务会先取消掉(新的一定是比老的先到期),然后新建一个定时任务,放到时间轮里面,到期会触发pushTask,然后继续触发执行上面的lua脚本。

上面这些就是队列的初始化部分,接下来再看启动消费消息的任务:

public class DelayThread implements Runnable {@Overridepublic void run() {try {RBlockingQueue<String> blockingFairQueue = redisDelayQueue.getBlockingQueue(topic);String data = null;while (true) {try {log.info("延时处理获取队列queueName:{}", topic);data = blockingFairQueue.take();} catch (InterruptedException e) {log.error("延时处理获取队列元素异常,队列:{}", topic, e);Thread.currentThread().interrupt();}if(StringUtils.isNotEmpty(data))  {T t = JSON.parseObject(data, classType);if(t == null) {log.error("延时队列元素json解析异常:{}", data);continue;}try {log.info("触发队列:{},任务:{}", topic, t.taskIdentity());Object beanInstance = SpringContextUtil.getBean(method.getDeclaringClass());TenantContext.set(t.getTenantId());method.invoke(beanInstance, t);} catch (Exception exception) {log.error("延时处理元素{}:{}异常", topic, t, exception);errorHandler(t);} finally {TenantContext.clear();}}}} catch (Throwable throwable) {log.error("延时处理线程错误", throwable);}}
}

这里主要是消费消息,流程很简单,每个topic都会在服务端新建一个阻塞队列副本,调用blockingFairQueue.take()阻塞获取任务,当上面的task查到已过期的数据时,则会存入阻塞队列副本,队列有数据则take方法会被唤醒,然后反射调用topic对应的方法(注意:topic对应的方法参数格式必须要可以反序列化成功,否则延时任务会退出

上面是初始化时的主要流程,接下来看addQueue时的代码:

public <T> void addQueue(String queueName, T t, long delay, TimeUnit timeUnit) {if (t != null) {RDelayedQueue<String> delayedQueue = this.getDelayQueue(queueName);log.info("任务入队列,延时时间:{}({}), queueName:{}, task:{}", new Object[]{delay, timeUnit.name(), queueName, JSON.toJSONString(t)});delayedQueue.offer(JSON.toJSONString(t), delay, timeUnit);}
}

这里也会触发队列的初始化流程getDelayQueue,主要看下offer方法:

public void offer(V e, long delay, TimeUnit timeUnit) {get(offerAsync(e, delay, timeUnit));
}
public RFuture<Void> offerAsync(V e, long delay, TimeUnit timeUnit) {if (delay < 0) {throw new IllegalArgumentException("Delay can't be negative");}long delayInMs = timeUnit.toMillis(delay);long timeout = System.currentTimeMillis() + delayInMs;long randomId = ThreadLocalRandom.current().nextLong();return commandExecutor.evalWriteNoRetryAsync(getRawName(), codec, RedisCommands.EVAL_VOID,"local value = struct.pack('dLc0', tonumber(ARGV[2]), string.len(ARGV[3]), ARGV[3]);" //打包消息体:消息id,消息长度,消息值+ "redis.call('zadd', KEYS[2], ARGV[1], value);"//zset中加入消息及其超时分值+ "redis.call('rpush', KEYS[3], value);" //向带前缀的list中添加消息 // if new object added to queue head when publish its startTime // to all scheduler workers + "local v = redis.call('zrange', KEYS[2], 0, 0); "//取出zset中第一个元素+ "if v[1] == value then " //如果最快过期的元素就是这次发送的消息+ "redis.call('publish', KEYS[4], ARGV[1]); " //channel中发布一下超时时间+ "end;",Arrays.asList(getRawName(), timeoutSetName, queueName, channelName),timeout, randomId, encode(e));
}

看下lua脚本做了什么:

1.zset和带前缀的list重添加消息

2.取出zset最新一条的任务数据

3.校验如果最新一条就是当前自己新增的这一条,channel重发布消息(发布后上面的listener就会接收到新消息通知,最后都会执行pushTask流程)

 

再看下服务端怎么消费的,即上面的blockingFairQueue.take()

take方法会调用org.redisson.RedissonBlockingQueue#take:

public V take() throws InterruptedException {return commandExecutor.getInterrupted(takeAsync());
}
public RFuture<V> takeAsync() {return commandExecutor.writeAsync(getRawName(), codec, RedisCommands.BLPOP_VALUE, getRawName(), 0);
}

往下跟会走到org.redisson.command.RedisExecutor#sendCommand

 

 

 

最终会发现其实调用的就是blpop key 0命令 无限阻塞式获取无前缀列表的任务数据,我们每次任务到期后都会推送到无前缀列表中,本地服务端就在这里pop出来,消费数据。

 

关于选型:

做之前我调研了一下市面上java常用的延时队列,单机实现直接pass, rocketMq等大部分第三方消息中间件公司不想部署机器且kafka实现起来较为复杂,剩下的实现要么基于redis,要么基于数据库,基于数据库实现的比较好的则是quartz,优点在于功能齐全,由于数据库的限制且所有逻辑都基于代码实现,在数据量较大时性能较差,而其很多强大的功能对于当前业务场景没必要,因此选择了较为轻量的redission实现。

redission实现的延时队列优点在于轻量化,其功能多基于redis的zset的数据结构特性以及其支持的发布订阅模式支持,而且能够将更多的压力转移到redis上,但其也有局限性,比如发布订阅没有ack无法保证消息不丢失等,但也可以通过在业务中尽量规避,减少业务处理的时间,比如消费完后推kafka, 虽然延时时间的精确度会降低,但对于当前业务来说相对更为合适,而且考虑到当前需求在较长一段时间内也不会做质的变更,即便有些变动稍加改造也能够满足一些稍复杂的场景,基本可以满足当前需求。

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/887659.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

利用网页爬虫从专业产品论坛提取评论的完整指南

了解如何通过网页爬虫从专业产品论坛提取评论,以获取可操作的洞见、改进策略,并节省时间。利用网页爬虫从专业产品论坛提取评论已经成为企业的游戏规则改变者。它使您能够从多个平台收集客户反馈(https://dataforest.ai/blog/top-web-scraping-use-cases),实时监控情绪,并自…

树的重心(树形dp)

我最开始将n减了1,应为边长有n-1条,但是这会导致计算时出现错误,因为其他地方会用到n#include<iostream> #include<vector> using namespace std; const int N=1e5+5; vector<int>v[N]; int f[N]; int vis[N]; int ans=1e5; int n; int dfs(int x){vis[x]…

仓库管理到底该怎么管?记住这五常六准原则!

仓库管理到底该怎么管? 要是你觉得仓库只是个“堆货的地方”,那可就大错特错了! 想象一下,货物乱成一团,找东西时费劲,出库慢,结果错发了货,那可真的是“大麻烦”啊。 客户打电话来投诉,老板又来催货,这种情况谁遇到谁头疼。 但其实,仓库管理不复杂,只要掌握了一些…

让窗口显示在所有虚拟桌面上

功能如图 C#代码实现如下public partial class MainWindow : Window {public MainWindow(){InitializeComponent();ShowInTaskbar = false;this.Loaded += MainWindow_Loaded;}private void MainWindow_Loaded(object sender, RoutedEventArgs e){IntPtr a = new System.Window…

[Vite] 性能优化

关于HTTP: https://www.cnblogs.com/Answer1215/p/18729206 DNS 打包方式:所有代码(包括 lodash-es、element-ui 等)被打包到一起,部署时只需请求一个或几个大文件。 优点是简单,不需要额外的网络请求;缺点是文件体积大,首次加载延迟较高,而且用户如果之前访问过其他…

python 并发场景梳理

一、多进程(Multiprocessing) 1、使用场景 适合CPU密集型任务,因为每个进程运行在独立的Python解释器中,不受全局解释器锁(GIL)的影响。 2、代码from multiprocessing import Process import timedef task():print("Task is running")time.sleep(10)processes …

Pic2Word

目录概Pic2Word代码Saito K., Sohn K., Zhang X., Li C., Lee C., Saenko K., and Pfister T. Pic2Word: Mapping pictures to words for zero-shot composed image retrieval. CVPR, 2023.概 本文关注的是 Composed Image Retrieval (CIR) 任务: 给定一个 reference (query) i…

SciTech-EECS-BigDataAIML-NN(神经网络): 常用的18种Activation(激活函数)

SciTech-EECS-BigDataAIML-NN(神经网络): 常用的18种Activation(激活函数) 一:简介 二:为什么要用激活函数 三:激活函数的分类 四:常见的几种激活函数 4.1.Sigmoid函数 4.2.Tanh函数 4.3.ReLU函数 4.4.Leaky Relu函数 4.5.PRelu函数 4.6.ELU函数 4.7.SELU函数 4.8.Swish函数…

用了CRM后,业绩不升反滑?问题可能出在这6个地方!

用了CRM系统后,我的公司——客户跟丢了 销售效率下降了 业绩一落千丈哈哈,比我惨的还有谁?! 不过我潜心研究了一阵,终于把这个CRM系统给弄懂了! 今天就跟大家聊一聊,为什么用了CRM系统后不仅没能提升业绩,反而拖了后腿,这是踩了哪些坑?以及该怎么调整才能真正发挥CRM…

功率MOS管的参数说明

图解功率MOS管的每一个参数!最大额定参数 最大额定参数,所有数值取得条件(Ta=25℃)VDSS 最大漏-源电压 在栅源短接,漏-源额定电压(VDSS)是指漏-源未发生雪崩击穿前所能施加的最大电压。根据温度的不同,实际雪崩击穿电压可能低于额定VDSS。关于V(BR)DSS的详细描述请参见静电…

安卓系统远程控制电脑方法,手机远控教程,ToDesk工具

不知道大家有没有觉得手机、平板虽然很好用,却也仍存在有很多替代不了电脑的地方。 就比如说撰写文档、做数据报表啥的就不如PC端操作般方便,就跟别说PS修图、AE视频剪辑等需高性能设备来带动才易用的了。 好在也是有对策可解决,装个ToDesk远程控制工具便能实现各设备的互联…

功率器件热设计基础(八)——利用瞬态热阻计算二极管浪涌电流

上一篇讲了两种热等效电路模型,Cauer模型和Foster模型,这一篇以二极管的浪涌电流为例,讲清瞬态热阻曲线的应用。/ 前言 / 功率半导体热设计是实现IGBT、碳化硅SiC高功率密度的基础,只有掌握功率半导体的热设计基础知识,才能完成精确热设计,提高功率器件的利用率,降低系统…