直播短视频源码,延迟任务的解决方法

news/2024/11/15 1:14:23/文章来源:https://www.cnblogs.com/yunbaomengnan/p/18423564

直播短视频源码,延迟任务的解决方法

在直播短视频源码中,我们有时候会遇到这样的场景,比如下单之后超过30分钟未支付自动取消订单,还有就比如过期/生效通知等等,这些场景一般有两种方法解决:
第一种可以通过定时任务扫描符合条件的去执行;
第二种就是提前通过消息队列发送延迟消息到期自动消费。

本文我要介绍的就是通过第二种方式来实现这种业务逻辑。

一、延迟队列RDelayedQueue的简单用法

生产者端

1、通过redissonClient的getBlockingDeque方法指定队列名称获得RBlockingDeque对象
2、然后再通过redissonClient的getDelayedQueue方法传入RBlockingDeque对象获得RDelayedQueue对象
3、最后调用RDelayedQueue对象的offer方法就可以将消息指定延迟时间发送到延迟队列了

@Component
public class DelayQueueKit {// 注入RedissonClient实例
    @Resourceprivate RedissonClient redissonClient;/*** 添加消息到延迟队列** @param queueCode 队列唯一KEY* @param msg       消息* @param delay     延迟时间* @param timeUnit  时间单位*/public <T> void addDelayQueue(String queueCode, T msg, long delay, TimeUnit timeUnit) {RBlockingDeque<T> blockingDeque = redissonClient.getBlockingDeque(queueCode);RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);// 这一步通过offer插入到队列
        delayedQueue.offer(msg, delay, timeUnit);}
}

 

消费者端

1、通过redissonClient获取RBlockingDeque对象
2、通过RBlockingDeque对象获取RDelayedQueue
3、之后RBlockingDeque再通过自旋调用take方法获取到期的消息,没有消息时会阻塞的。
Tip:一般情况下我们在直播短视频源码刚启动时异步开一个线程去自旋消费队列消息的

@Component
public class DelayQueueKit {// 注入RedissonClient实例
    @Resourceprivate RedissonClient redissonClient;public <T> void consumeQueueMsg(String queueCode) {RBlockingDeque<T> delayQueue = redissonClient.getBlockingDeque(queueCode);RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);log.info("【队列-{}】- 监听队列成功", queueCode);while (true) {T message = null;try {message = delayQueue.take();// 处理自己的业务
                handleMessage(message);log.info("【队列-{}】- 处理元素成功 - ele = {}", queueCode, ele);} catch (Exception e) {log.error("【队列-{}】- 处理元素失败 - ele = {}", queueCode, ele, e);}}}
}

 

二、数据结构设计

Redission实现延迟队列消息用到了四个数据结构:

 

redisson_delay_queue_timeout:{queue_name} 定期队列,ZSET结构(value为消息,score为过期时间),这样就可以知道当前过期的消息。
redisson_delay_queue:{queue_name} 顺序队列,LIST结构,按照消息添加顺序存储,移除消息时可以按照添加顺序删除。
redisson_delay_queue_channel:{queue_name} 发布订阅channel主题,用于通知客户端定时器从定期队列转移到期的消息到目标队列。
{queue_name} 目标队列,LIST结构,存储实际到期可以被消费的消息供消费者拉取消费。

三、消息生产源码分析

1、通过redissonClient.getDelayedQueue获取RDelayedQueue对象

2、然后delayedQueue调用offer方法去保存消息

3、最后真正的保存逻辑是由RedissonDelayedQueue执行offerAsync方法调用的lua脚本

public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelayedQueue<V> {@Overridepublic RFuture<Void> offerAsync(V e, long delay, TimeUnit timeUnit) {if (delay < 0) {throw new IllegalArgumentException("Delay can't be negative");}long delayInMs = timeUnit.toMillis(delay);// 消息过期时间 = 当前时间 + 延迟时间long timeout = System.currentTimeMillis() + delayInMs;// 生成随机id,应该是为了允许插入到zset重复的消息long randomId = ThreadLocalRandom.current().nextLong();// 执行脚本return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_VOID,// 将消息打包成二进制的, 打包的消息 = 随机数 + 消息,有了随机数意味着消息就可以重复"local value = struct.pack('dLc0', tonumber(ARGV[2]), string.len(ARGV[3]), ARGV[3]);"// 将 打包的消息和过期时间 插入redisson_delay_queue_timeout队列+ "redis.call('zadd', KEYS[2], ARGV[1], value);"// 顺序插入redisson_delay_queue队列+ "redis.call('rpush', KEYS[3], value);"// 如果刚插入的消息就是timeout队列的最前面,即刚插入的消息最近要到期+ "local v = redis.call('zrange', KEYS[2], 0, 0); "+ "if v[1] == value then "// 发布消息通知客户端消息到期时间,让它定期执行转移操作+ "redis.call('publish', KEYS[4], ARGV[1]); "+ "end;",Arrays.<Object>asList(getName(), timeoutSetName, queueName, channelName),// 三个参数:1-过期时间 2-随机数 3-消息
            timeout, randomId, encode(e));}
}

 

四、定时器转移消息源码分析

大家如果仅仅使用而没有看过源码的可能不太容易知道redission究竟哪里执行的定时器去定时转移到期消息的,我也是最近看源码才知道,其实就是在调用redissonClient.getDelayedQueue获取RDelayedQueue对象时创建的:

1、通过redissonClient.getDelayedQueue获取RDelayedQueue对象

2、然后会执行RedissonDelayedQueue的构造函数方法

3、在这个构造方法里就会新建QueueTransferTask这个对象去执行转移操作

public class Redisson implements RedissonClient {@Overridepublic <V> RDelayedQueue<V> getDelayedQueue(RQueue<V> destinationQueue) {if (destinationQueue == null) {throw new NullPointerException();}// 执行RedissonDelayedQueue构造方法return new RedissonDelayedQueue<V>(queueTransferService, destinationQueue.getCodec(), connectionManager.getCommandExecutor(), destinationQueue.getName());}
}
public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelayedQueue<V> {protected RedissonDelayedQueue(QueueTransferService queueTransferService, Codec codec, final CommandAsyncExecutor commandExecutor, String name) {...QueueTransferTask task = new QueueTransferTask(commandExecutor.getConnectionManager()) {@Overrideprotected RFuture<Long> pushTaskAsync() {return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG,// 从redisson_delay_queue_timeout队列获取100个到期的消息"local expiredValues = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); "+ "if #expiredValues > 0 then "+ "for i, v in ipairs(expiredValues) do "// 将包装的消息执行解包操作,随机数 + 原消息        + "local randomId, value = struct.unpack('dLc0', v);"// 将原消息插入到{queue_name}队列,就可以被消费了        + "redis.call('rpush', KEYS[1], value);"+ "redis.call('lrem', KEYS[3], 1, v);"+ "end; "// 转移后redisson_delay_queue_timeout队列也移除这些消息        + "redis.call('zrem', KEYS[2], unpack(expiredValues));"+ "end; "// 从定时队列获取最近到期时间然后供定时器到时间再执行+ "local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); "+ "if v[1] ~= nil then "+ "return v[2]; "+ "end "+ "return nil;",Arrays.<Object>asList(getName(), timeoutSetName, queueName),System.currentTimeMillis(), 100);}// 主题redisson_delay_queue_channel:{queue_name}注册发布/订命令执行阅监听器
            @Overrideprotected RTopic getTopic() {return new RedissonTopic(LongCodec.INSTANCE, commandExecutor, channelName);}};// 将定时器命令执行逻辑注册到发布/订阅主题,这样就可以在收到订阅时执行转移操作了
        queueTransferService.schedule(queueName, task);...}
}

 

五、消息消费源码分析

消息消费的逻辑就比较简单了,从RBlockingDeque使用take方法获取消息时,直接调用的就是redis中List的BLPOP命令。

Redis Blpop 命令移出并获取列表的第一个元素, 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。

public class RedissonBlockingQueue<V> extends RedissonQueue<V> implements RBlockingQueue<V> {@Overridepublic RFuture<V> takeAsync() {// 执行redis中List的BLPOP命令,从{queue_name}队列阻塞取出元素return commandExecutor.writeAsync(getName(), codec, RedisCommands.BLPOP_VALUE, getName(), 0);}
}

 

以上就是直播短视频源码,延迟任务的解决方法, 更多内容欢迎关注之后的文章

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/800856.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

opencascade Bnd_BoundSortBox源码学习 包围盒

opencascade Bnd_BoundSortBox 包围盒前言 一个工具,用于将一个包围盒或一个平面与一组包围盒进行比较。它会对这组包围盒进行排序,生成与被比较元素相交的盒子的列表。这些被排序的盒子通常包围着一组形状,而被比较的盒子则包围了一个需要比较的形状。因此,最终得到的相交…

短视频软件源码,为数据安全建立起坚实的防线

短视频软件源码,为数据安全建立起坚实的防线 保证数据安全是当今互联网时代的重要任务。为了应对日益复杂的网络攻击,行为验证码应运而生。行为验证码通过分析用户在网站上的行为模式,识别正常用户并阻止恶意活动。 它不仅提供了更强大的身份确认方式,还能有效减少伪造身份…

信息收集第二波

情境 参加了培训的第五次培训, 涉及到了更多的 信息收集基础工具, 感觉自己没有好好学, 没学到, 没学懂, 有点难受, 一点都不优雅…… 在强力压缩饼干作用下的简而言之, 这周我有幸瞻和接触到了 这些工具和内容: Nmap, 敏感信息收集, 信息深度收集, AWVS, X-Scan, AppScan, Xra…

Nexpose 6.6.270 发布下载,新增功能概览

Nexpose 6.6.270 发布下载,新增功能概览Nexpose 6.6.270 for Linux & Windows - 漏洞扫描 Rapid7 Vulnerability Management, release Sep 18, 2024 请访问原文链接:https://sysin.org/blog/nexpose-6/,查看最新版。原创作品,转载请保留出处。 作者主页:sysin.org您的…

opencascade Bnd_Box源码学习 包围盒

opencascade Bnd_Box 包围盒 前言 描述一个三维空间中的包围盒一个包围盒与坐标系的轴线平行。如果它是有限的,则由三个区间定义:[Xmin, Xmax], [Ymin, Ymax], [Zmin, Zmax]。一个包围盒在一个或多个方向上可能是无限的(即开放的)。它被称为:OpenXmin 如果它在“X方向”…

用户验收测试指南6计划

6 计划 我们需要像开始任何重要工作一样开始我们的 UAT 工作--决定我们要实现的目标是什么。当我们开始进行 UAT 时,您可能会认为这应该已经很明确了,但请记住,变化是计划的魔咒。很多事情都会偏离最初的计划和要求--有偶然的,也有蓄意的。此时此刻,我们必须最终确定我们认…

【文化课学习笔记】【物理】电场

高中物理学习笔记:电场【物理】电场 前置知识 绝缘体:本质是物体内部电荷无法自由移动。 导体:本质是物体内部电荷可以自由移动。 电荷的移动:导体内部能够发生自由移动的电荷只有负电荷。 显电性:显示的电性,是内部的正负电荷中和之后的结果,不是一定带有几个单位的正电…

Shiro-721—漏洞分析(CVE-2019-12422)

Shiro-721漏洞的简单分析与总结(CVE-2019-12422)目录Padding Oracle Attack 原理PKCS5填充怎么爆破攻击漏洞原理源码分析漏洞复现本文基于shiro550漏洞基础上分析,建议先看上期内容: https://blog.csdn.net/weixin_60521036/article/details/142373353Padding Oracle Attack …

node环境搭建、npm及pnpm安装

1.背景最近换了笔记本,重新搭建了环境,顺手记录下脚本之类的,后续再遇到懒得一个个文件夹创建了。2.node及npm安装 2.1 解压安装 我习惯安装的是解压版:点击此处下载下载完成后,会得到压缩包,解压到指定位置即可。例如,我放在了: D:\toolkit\node\20.17.0解压后的文件中…

pnpm-配置环境目录(win脚本)

1.背景最近换了笔记本,重新搭建了环境。装完node后一般咱们会换到pnpm,这里记录下配置pnpm环境的脚本,懒得一个个文件夹创建了。文件夹名称 作用.pnpm-bin-dir 存放全局安装的可执行文件路径,方便在命令行中直接调用这些工具。.pnpm-cache 用于存储下载的包的缓存,加速后续…

大数据与人工智能-平台搭建准备之VM虚拟机与centos网络配置

一.前提(前提可以不看): 准备好需要的JDK,HADOOP,HIVE……等一些列组建安装包。 rpm -ivh –nodeps xxxx.rpm 可以强制安装本地xxxx软件包 为了提高虚拟机运行速度,可以关闭Cent os7的图形化界面:查看默认的target,执行:systemctl get-default开机以命令模式启动,执行…

游戏技术

目录显示相关的术语每个帧的像素:分辨率多个帧的刷新:刷新率、帧率每个像素的颜色编码码率显卡渲染技术DLSS2 牺牲画质 提高帧率DLSS3 进一步提高帧率 刷新更流畅 显示相关的术语 每个帧的像素:分辨率 分辨率 = 水平宽度的像素数(列数) x 垂直高度的像素数(行数)速记 分辨率…