直播短视频源码,延迟任务的解决方法
在直播短视频源码中,我们有时候会遇到这样的场景,比如下单之后超过30分钟未支付自动取消订单,还有就比如过期/生效通知等等,这些场景一般有两种方法解决:
第一种可以通过定时任务扫描符合条件的去执行;
第二种就是提前通过消息队列发送延迟消息到期自动消费。
本文我要介绍的就是通过第二种方式来实现这种业务逻辑。
生产者端
1、通过redissonClient的getBlockingDeque方法指定队列名称获得RBlockingDeque对象
2、然后再通过redissonClient的getDelayedQueue方法传入RBlockingDeque对象获得RDelayedQueue对象
3、最后调用RDelayedQueue对象的offer方法就可以将消息指定延迟时间发送到延迟队列了
@Component public class DelayQueueKit {// 注入RedissonClient实例 @Resourceprivate RedissonClient redissonClient;/*** 添加消息到延迟队列** @param queueCode 队列唯一KEY* @param msg 消息* @param delay 延迟时间* @param timeUnit 时间单位*/public <T> void addDelayQueue(String queueCode, T msg, long delay, TimeUnit timeUnit) {RBlockingDeque<T> blockingDeque = redissonClient.getBlockingDeque(queueCode);RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);// 这一步通过offer插入到队列 delayedQueue.offer(msg, delay, timeUnit);} }
消费者端
1、通过redissonClient获取RBlockingDeque对象
2、通过RBlockingDeque对象获取RDelayedQueue
3、之后RBlockingDeque再通过自旋调用take方法获取到期的消息,没有消息时会阻塞的。
Tip:一般情况下我们在直播短视频源码刚启动时异步开一个线程去自旋消费队列消息的
@Component public class DelayQueueKit {// 注入RedissonClient实例 @Resourceprivate RedissonClient redissonClient;public <T> void consumeQueueMsg(String queueCode) {RBlockingDeque<T> delayQueue = redissonClient.getBlockingDeque(queueCode);RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);log.info("【队列-{}】- 监听队列成功", queueCode);while (true) {T message = null;try {message = delayQueue.take();// 处理自己的业务 handleMessage(message);log.info("【队列-{}】- 处理元素成功 - ele = {}", queueCode, ele);} catch (Exception e) {log.error("【队列-{}】- 处理元素失败 - ele = {}", queueCode, ele, e);}}} }
Redission实现延迟队列消息用到了四个数据结构:
redisson_delay_queue_timeout:{queue_name} 定期队列,ZSET结构(value为消息,score为过期时间),这样就可以知道当前过期的消息。
redisson_delay_queue:{queue_name} 顺序队列,LIST结构,按照消息添加顺序存储,移除消息时可以按照添加顺序删除。
redisson_delay_queue_channel:{queue_name} 发布订阅channel主题,用于通知客户端定时器从定期队列转移到期的消息到目标队列。
{queue_name} 目标队列,LIST结构,存储实际到期可以被消费的消息供消费者拉取消费。
1、通过redissonClient.getDelayedQueue获取RDelayedQueue对象
2、然后delayedQueue调用offer方法去保存消息
3、最后真正的保存逻辑是由RedissonDelayedQueue执行offerAsync方法调用的lua脚本
public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelayedQueue<V> {@Overridepublic RFuture<Void> offerAsync(V e, long delay, TimeUnit timeUnit) {if (delay < 0) {throw new IllegalArgumentException("Delay can't be negative");}long delayInMs = timeUnit.toMillis(delay);// 消息过期时间 = 当前时间 + 延迟时间long timeout = System.currentTimeMillis() + delayInMs;// 生成随机id,应该是为了允许插入到zset重复的消息long randomId = ThreadLocalRandom.current().nextLong();// 执行脚本return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_VOID,// 将消息打包成二进制的, 打包的消息 = 随机数 + 消息,有了随机数意味着消息就可以重复"local value = struct.pack('dLc0', tonumber(ARGV[2]), string.len(ARGV[3]), ARGV[3]);"// 将 打包的消息和过期时间 插入redisson_delay_queue_timeout队列+ "redis.call('zadd', KEYS[2], ARGV[1], value);"// 顺序插入redisson_delay_queue队列+ "redis.call('rpush', KEYS[3], value);"// 如果刚插入的消息就是timeout队列的最前面,即刚插入的消息最近要到期+ "local v = redis.call('zrange', KEYS[2], 0, 0); "+ "if v[1] == value then "// 发布消息通知客户端消息到期时间,让它定期执行转移操作+ "redis.call('publish', KEYS[4], ARGV[1]); "+ "end;",Arrays.<Object>asList(getName(), timeoutSetName, queueName, channelName),// 三个参数:1-过期时间 2-随机数 3-消息 timeout, randomId, encode(e));} }
大家如果仅仅使用而没有看过源码的可能不太容易知道redission究竟哪里执行的定时器去定时转移到期消息的,我也是最近看源码才知道,其实就是在调用redissonClient.getDelayedQueue获取RDelayedQueue对象时创建的:
1、通过redissonClient.getDelayedQueue获取RDelayedQueue对象
2、然后会执行RedissonDelayedQueue的构造函数方法
3、在这个构造方法里就会新建QueueTransferTask这个对象去执行转移操作
public class Redisson implements RedissonClient {@Overridepublic <V> RDelayedQueue<V> getDelayedQueue(RQueue<V> destinationQueue) {if (destinationQueue == null) {throw new NullPointerException();}// 执行RedissonDelayedQueue构造方法return new RedissonDelayedQueue<V>(queueTransferService, destinationQueue.getCodec(), connectionManager.getCommandExecutor(), destinationQueue.getName());} } public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelayedQueue<V> {protected RedissonDelayedQueue(QueueTransferService queueTransferService, Codec codec, final CommandAsyncExecutor commandExecutor, String name) {...QueueTransferTask task = new QueueTransferTask(commandExecutor.getConnectionManager()) {@Overrideprotected RFuture<Long> pushTaskAsync() {return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG,// 从redisson_delay_queue_timeout队列获取100个到期的消息"local expiredValues = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); "+ "if #expiredValues > 0 then "+ "for i, v in ipairs(expiredValues) do "// 将包装的消息执行解包操作,随机数 + 原消息 + "local randomId, value = struct.unpack('dLc0', v);"// 将原消息插入到{queue_name}队列,就可以被消费了 + "redis.call('rpush', KEYS[1], value);"+ "redis.call('lrem', KEYS[3], 1, v);"+ "end; "// 转移后redisson_delay_queue_timeout队列也移除这些消息 + "redis.call('zrem', KEYS[2], unpack(expiredValues));"+ "end; "// 从定时队列获取最近到期时间然后供定时器到时间再执行+ "local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); "+ "if v[1] ~= nil then "+ "return v[2]; "+ "end "+ "return nil;",Arrays.<Object>asList(getName(), timeoutSetName, queueName),System.currentTimeMillis(), 100);}// 主题redisson_delay_queue_channel:{queue_name}注册发布/订命令执行阅监听器 @Overrideprotected RTopic getTopic() {return new RedissonTopic(LongCodec.INSTANCE, commandExecutor, channelName);}};// 将定时器命令执行逻辑注册到发布/订阅主题,这样就可以在收到订阅时执行转移操作了 queueTransferService.schedule(queueName, task);...} }
五、消息消费源码分析
消息消费的逻辑就比较简单了,从RBlockingDeque使用take方法获取消息时,直接调用的就是redis中List的BLPOP命令。
Redis Blpop 命令移出并获取列表的第一个元素, 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。
public class RedissonBlockingQueue<V> extends RedissonQueue<V> implements RBlockingQueue<V> {@Overridepublic RFuture<V> takeAsync() {// 执行redis中List的BLPOP命令,从{queue_name}队列阻塞取出元素return commandExecutor.writeAsync(getName(), codec, RedisCommands.BLPOP_VALUE, getName(), 0);} }
以上就是直播短视频源码,延迟任务的解决方法, 更多内容欢迎关注之后的文章