延时任务通知服务的设计及实现(三)-- JDK的延迟队列DelayQueue

一、接着上文

上文我们讲述了使用redisson的RDelayedQueue实现分布式延迟队列,本文我们将自己JDK的延迟队列DelayQueue实现。

相比前者的实现,作为进程内的延迟队列,它会遇到许多技术难点:

  • 如何支持分布式的多个节点部署场景
  • 应用重启会恢复延时队列
  • 冷数据如何转换为热数据
  • 如何删除延迟队列中的任务

随后,我们也将提及:

  • 保存任务至延迟队列(生产者)
  • 读取延迟队列中的任务(消费者)

二、设计概要

在这里插入图片描述

  • 冷数据:mysql表中的任务数据

  • 热数据:jdk 延迟队列中的任务

  • 广播事件:删除延迟队列中的任务,发布的是广播事件,可以使用redis topic实现。

  • 本地事件:分布式多节点部署的时候,每个任务只保存在其中一个节点的延迟队列中,可以使用spring事件驱动实现。

  • 延迟队列 DelayQueueJob, 它实现了接口Delayed

包括任务的交易流水号和过期时间(即任务的回调时间)

import lombok.Builder;
import lombok.Data;import java.util.Date;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;/*** @author xxx*/
@Builder
@Data
public class DelayQueueJob implements Delayed {/*** 交易流水号*/private String transNo;/*** 到期时间*/private Date expireDate;public DelayQueueJob(String transNo, Date expireDate) {super();this.transNo = transNo;this.expireDate = expireDate;}/*** 用于队列中排序过期时间** @param o* @return*/@Overridepublic int compareTo(Delayed o) {return Long.valueOf(this.expireDate.getTime()).compareTo(Long.valueOf(((DelayQueueJob) o).expireDate.getTime()));}/*** 用于获取过期时间* 延迟关闭时间 = 过期时间 - 当前时间** @param unit* @return*/@Overridepublic long getDelay(TimeUnit unit) {return this.expireDate.getTime() - System.currentTimeMillis();}
}

三、应用启动流程

解决恢复延迟队列的问题。因为DelayQueue是进程内的,一旦重启,将被销毁。

在这里插入图片描述

import cn.hutool.core.date.DateTime;
import cn.hutool.core.date.DateUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;import java.util.List;
import java.util.concurrent.TimeUnit;@Slf4j
@Service
@RequiredArgsConstructor
public class ApplicationStartupListener implements ApplicationListener<ApplicationReadyEvent> {@Overridepublic void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {// 实现代码参考上面的流程图}
}

四、定时任务流程

解决冷数据如何转换为热数据的问题,防止延时任务过多导致消耗过多的jvm内存,所以只有回调时间将近的任务才放入延迟队列。

在这里插入图片描述

五、如何删除延迟队列中的任务

删除延迟队列的任务:发送广播消息通知所有的节点,当不是当前节点的时候,执行删除。

if (!NetUtil.getLocalhostStr().equals(ipAddress)) {DelayQueueSingleton.getDelayQueue().remove(transNo);
}

DelayQueueSingletons是一个单例类,详见下:

public class DelayQueueSingleton {private static volatile CustomDelayQueue<DelayQueueJob> delayQueue;private DelayQueueSingleton() {}public static CustomDelayQueue<DelayQueueJob> getDelayQueue() {if (delayQueue == null) {synchronized (DelayQueueSingleton.class) {if (delayQueue == null) {delayQueue = new CustomDelayQueue<>();}}}return delayQueue;}}

这里为了删除延迟队列的任务,我们对DelayQueue进行了重写。


import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;public class CustomDelayQueue<T extends Delayed> {private final DelayQueue<T> queue = new DelayQueue<>();private final Map<String, T> map = new ConcurrentHashMap<>();public boolean put(T task, String taskId) {// 如果任务已存在,则删除旧任务,防止重复添加this.remove(taskId);map.put(taskId, task);return queue.add(task);}public boolean remove(String taskId) {// 先删除map,再删除queueT task = map.remove(taskId);if (task != null) {return queue.remove(task);}return false;}public T take() throws InterruptedException {return queue.take();}
}

六、保存任务至延迟队列(生产者)


// 如果通知时间在一定时间范围内
if (DateUtil.offsetMinute(new DateTime(), commonConfig.getHotDataTimeLine()).after(event.getNotifyDate())) {DelayQueueSingleton.getDelayQueue().put(DelayQueueJob.builder().transNo(event.getTransNo()).expireDate(event.getNotifyDate()).build(), event.getTransNo());}

七、读取延迟队列中的任务(消费者)

作为延迟队列的消费者,它的实现和上一篇文章实现类似。不同的是take()获取任务不一样。

String transNo = null;
Date notifyDate = null;DelayQueueJob job = DelayQueueSingleton.getDelayQueue().take();
if (null != job) {transNo = job.getTransNo();notifyDate = job.getExpireDate();
}if (null == transNo) {return;
}if (log.isInfoEnabled()) {log.info("开始执行延迟队列中的任务,transNo={},notifyDate={}", transNo, notifyDate);
}// 异步执行你的操作
notifyTaskService.handleTask(transNo, notifyDate);

八、总结

作为进程内的延迟队列,在多点部署的分布式集群环境下, 代码明显比上一篇要复杂得多。

它们都需要的步骤是:

  • 任务的生产
  • 任务的消费
  • 移除任务

DelayQueue额外多出来的步骤是:

  • 应用启动的时候拉取回调时间将近的未完成任务(更新marked标记为true,防止重复拉取冷数据)
  • 定时拉取未标记且回调时间将近的未完成任务(和上面必须是互斥,等待上一步执行完成,否则会导致重复拉取)
  • 删除延迟队列DelayQueue的任务,必须发布广播消息给全部节点。(引入广播消息机制)

由此可见,任务表的字段marked仅供DelayQueue使用,防止重复拉取数据库的任务到热数据区。

    @Column(name = "marked", nullable = false, columnDefinition = "TINYINT(1) default 0 COMMENT '是否已标记为热数据'")private Boolean marked;

附:相关系列文章链接

延时任务通知服务的设计及实现(一)-- 设计方案

延时任务通知服务的设计及实现(二)-- redisson的延迟队列RDelayedQueue

延时任务通知服务的设计及实现(三)-- JDK的延迟队列DelayQueue

延时任务通知服务的设计及实现(四)-- webhook执行任务

延时任务通知服务的设计及实现(五)-- Netty时间轮HashedWheelTimer

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/675068.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于SpringBoot和PostGIS的各省与地级市空间距离分析

目录 前言 一、PostGIS时空库 1、时空表设计 2、空间数据管理与查询 二、后台接口设计 1、ORM层设计与实现 2、业务层设计与实现 3、控制层设计 三、web可视化设计与实现 1、省份范围展示 2、城市距离可视化 3、成果展示 总结 前言 在上一篇博客中基于Java和GDAL实…

纯血鸿蒙APP实战开发——Canvas实现模拟时钟案例

介绍 本示例介绍利用Canvas 和定时器实现模拟时钟场景&#xff0c;该案例多用于用户需要显示自定义模拟时钟的场景。 效果图预览 使用说明 无需任何操作&#xff0c;进入本案例页面后&#xff0c;所见即模拟时钟的展示。 实现思路 本例的的主要实现思路如下&#xff1a; …

热敏电阻怎么进行性能测试?并以LabVIEW为例进行说明

过程也可用于执行热敏电阻测量。RTD和热敏电阻遵循非常相似的功能原理&#xff0c;测量步骤与下面提供的步骤相同。有关热敏电阻的更多信息&#xff0c;请参阅本文档。 查找设备引脚排列 在连接任何信号之前&#xff0c;请找到您的设备引脚排列。 打开NI MAX并展开设备和接口。…

ENVI下实现遥感矿物蚀变信息提取

蚀变岩石是在热液作用影响下&#xff0c;使矿物成分、化学成分、结构、构造等发生变化的岩石。由于它们经常见于热液矿床的周围&#xff0c;因此被称为蚀变围岩&#xff0c;蚀变围岩是一种重要的找矿标志。利用围岩蚀变现象作为找矿标志已有数百年历史&#xff0c;发现的大型金…

做好源代码防泄密的10条准则

#深度好文计划# 近年来&#xff0c;电脑以及互联网应用在中国的普及和发展&#xff0c;已经深入到社会每个角落&#xff0c; 政府&#xff0c;经济&#xff0c;军事&#xff0c;社会&#xff0c;文化和人们生活等各方面都越来越依赖于电脑和网络。企业需要花费大量的时间精力去…

vivado Zynq UltraScale+ MPSoC 比特流设置

Zynq UltraScale MPSoC 比特流设置 下表所示 Zynq UltraScale MPSoC 器件的器件配置设置可搭配 set_property <Setting> <Value> [current_design] Vivado 工具 Tcl 命令一起使用。

SolidWorks进行热力学有限元分析二、模型装配

1.先打开软件&#xff0c;新建装配体 2.选中你要装配的零件&#xff0c;直接导入就行 3.鼠标点击左键直接先放进去 4.开始装配&#xff0c;点配合 5.选择你要接触的两个面&#xff0c;鼠标右键确定&#xff0c;然后把剩下的面对齐一下就行了 6.搞定

java入门详细教程——day01

目录 1. Java入门 1.1 Java是什么&#xff1f; 1.2 Java语言的历史 1.3 Java语言的分类 1.4 Java语言的特点 1.4.1 先编译再解释运行 1.4.2 跨平台 1.5 JRE和JDK&#xff08;记忆&#xff09; 1.6 JDK的下载和安装&#xff08;应用&#xff09; 1.6.1 下载 1.6.2 安…

《QT实用小工具·五十三》会跑走的按钮

1、概述 源码放在文章末尾 该项目实现了会逃跑的按钮&#xff1a; 两个按钮&#xff0c;一个为普通按钮&#xff0c;另一个为会跑走的按钮 鼠标移到上面时&#xff0c;立刻跑掉 针对鼠标、键盘、触屏进行优化 随机交换两个按钮的文字、偶尔钻到另一个按钮下面、鼠标移开自…

层级实例化静态网格体组件:开启大量模型处理之门

前言 在数字孪生的世界里&#xff0c;我们常常需要构建大量的模型来呈现真实而丰富的场景。然而&#xff0c;当使用静态网格体 &#xff08;StaticMesh &#xff09;构建大量模型时&#xff0c;可能会遇到卡顿的问题&#xff0c;这给我们带来了不小的困扰&#x1f623;。那么&…

【Three.js基础学习】15.scroll-based-animation

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 前言 课程要点 结合html等场景 做滚动动画 1.遇到的问题&#xff0c; 在向下滚动时&#xff0c;下方会显白&#xff08;部分浏览器&#xff09; 解决&#xff1a;alpha:true …

网络1--通信过程的理解

1.封装与解包 通信的过程就是不断的封装和解包的过程 封装即就是按照“应用”“传输” “网络” “链路” 层&#xff0c;封装给每一层都加上相应的包头&#xff08;每一层都有协议&#xff0c;&#xff09;解包就是接受到的包文被一层层去掉相对应的包头。 任何一层的协议都…