延时任务通知服务的设计及实现(五)-- Netty时间轮HashedWheelTimer

一、背景

前文我们讲述了如何使用redisson的RDelayedQueue以及JDK的DelayQueue实现延迟队列,本文将补充一种实现延迟队列的方式–Netty的时间轮HashedWheelTimer。
它和JDK的DelayQueue一样,都存在着以下问题:

  • 占jvm内存,数据量大的时候,可能会导致OOM
  • 机器重启,内存中的延迟队列丢失
  • 需要解决分布式部署的问题

针对第一个问题,我们的解决办法是:

  • 1、设置热数据的时间范围,拉取通知时间将近的任务放入JVM内存,而非全部任务。热数据才存入延迟队列(我们称之为热数据),这样大大减少jvm内存。未到时间线的任务,存储在数据库中,我们称之为冷数据。
  • 2、使用分布式定时任务,对多个节点进行轮询,每次拉取100个任务放入延迟队列。通过轮询,不至于让任务都存放在同一个jvm内存。

针对第二个问题,我们的解决办法是:

  • 在应用重启后,查询热数据,将之存入延迟队列

针对第三个问题,我们的应对方法是:

  • 1、冷数据转换为热数据,使用分布式的定时任务。也就是说,同一个时间中,同一个任务只会存入某一个jvm内存。
  • 2、每次把任务放入延迟队列的时候,需要发布广播消息,由其他节点订阅,从延迟队列中移除该任务。

最后一点,我们在把任务放入延迟队列前,必须先删除jvm内存的任务。这样,防止同一个任务被重复存放至同一个jvm中的延迟队列。

另外,我们需要在任务执行的时候,判断期望执行时间是否和延迟队列中的任务执行时间一致,如果不一致,任务则忽略执行。

二、Netty时间轮HashedWheelTimer

本文许多部分和上文JDK的延迟队列DelayQueue 的许多流程类似,所以下面拣选重点描述。

1、定义任务

  • NettyTimerJob.java

import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;import java.util.Date;/*** @author xxx*/
@Slf4j
@Data
public class NettyTimerJob implements TimerTask {/*** 通知任务*/private NotifyTaskService notifyTaskService;/*** 交易流水号*/private String transNo;/*** 到期时间*/private Date expireDate;public NettyTimerJob(String transNo, Date expireDate, NotifyTaskService notifyTaskService) {this.transNo = transNo;this.expireDate = expireDate;this.notifyTaskService = notifyTaskService;}@Overridepublic void run(Timeout timeout) throws Exception {if (log.isInfoEnabled()) {log.info("开始执行延迟队列中的任务,transNo={},expireDate={}", transNo, expireDate);}// 异步执行你的操作(任务的执行)notifyTaskService.handleTask(transNo, expireDate);}
}

2、封装时间轮

删除时间轮中的任务,需要使用Map存储任务唯一标识对应的Timeout对象,调用timeout.cancel()取消任务。

  • CustomHashedWheelTimer.java
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;/*** @author xxx*/
public class CustomHashedWheelTimer {private final HashedWheelTimer timer;private final Map<String, Timeout> map;public CustomHashedWheelTimer(HashedWheelTimer timer) {this.timer = timer;this.map = new ConcurrentHashMap<>();}public void put(String taskId, NettyTimerJob job, long delay, TimeUnit unit) {// 如果任务已存在,则删除旧任务,防止重复添加this.remove(taskId);Timeout timeout = timer.newTimeout(job, delay, unit);// 将Timeout对象存储到Map中map.put(taskId, timeout);}public boolean remove(String taskId) {// 从Map中移除对应的Timeout对象Timeout timeout = map.remove(taskId);if (timeout != null) {// 取消任务return timeout.cancel();}return false;}
}
  • NettyTimerSingleton.java

import io.netty.util.HashedWheelTimer;
import io.netty.util.concurrent.DefaultThreadFactory;import java.util.concurrent.TimeUnit;/*** @author xxx*/
public class NettyTimerSingleton {private static volatile CustomHashedWheelTimer hashedWheelTimer;private NettyTimerSingleton() {}public static CustomHashedWheelTimer getInstance() {if (hashedWheelTimer == null) {synchronized (DelayQueueSingleton.class) {if (hashedWheelTimer == null) {hashedWheelTimer = new CustomHashedWheelTimer(new HashedWheelTimer(new DefaultThreadFactory("netty-timer"),100,TimeUnit.MILLISECONDS,512,true));}}}return hashedWheelTimer;}
}

3、操作延迟队列

  • 保存任务至延迟队列(生产者)
if (DateUtil.offsetMinute(new DateTime(), commonConfig.getHotDataTimeLine()).after(event.getNotifyDate())) {final long delay = DateUtil.between(event.getNotifyDate(), new DateTime(), DateUnit.SECOND);NettyTimerSingleton.getInstance().put(event.getTransNo(),new NettyTimerJob(event.getTransNo(), event.getNotifyDate(), notifyTaskService),delay,TimeUnit.SECONDS);success = true;
}
  • 删除延迟队列的任务
final String transNo = event.getTransNo();NettyTimerSingleton.getInstance().remove(transNo);

三、总结

Netty时间轮和JDK的DelayQueue的最大区别是,你不需一直轮询延迟队列中的任务是否到期。
它的任务执行是在io.netty.util.TimerTask的run()方法中,所以我们在封装任务的时候,需要把NotifyTaskService对象放入,以便在任务执行的时候调用。

和JDK的DelayQueue一样,你需要使用一个Map集合持久化延迟队列,因为每次将任务放入延迟队列前,你都必须先删除之前的任务。否则同一个任务会在延迟队列中保存多份,一来浪费jvm内存,二来会导致任务的重复执行。(虽然我们在任务执行的时候,会对任务进行兜底,期望执行时间与延迟队列中的任务时间进行对比)

最后说一下,我们使用的netty版本是 4.1.43.Final,具体的依赖关系见下:

<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.2.4.RELEASE</version><relativePath/> <!-- lookup parent from repository -->
</parent><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

spring-boot-starter-data-redis依赖lettuce,后者又依赖netty,所以无需额外引入netty。

    <dependency><groupId>io.lettuce</groupId><artifactId>lettuce-core</artifactId><version>5.2.1.RELEASE</version><scope>compile</scope></dependency>

在这里插入图片描述

当然,你如果引入了redisson框架,也无需额外引入netty框架。
由此也见,netty框架的使用范围之广。。。

附:相关系列文章链接

延时任务通知服务的设计及实现(一)-- 设计方案

延时任务通知服务的设计及实现(二)-- redisson的延迟队列RDelayedQueue

延时任务通知服务的设计及实现(三)-- JDK的延迟队列DelayQueue

延时任务通知服务的设计及实现(四)-- webhook执行任务

延时任务通知服务的设计及实现(五)-- Netty时间轮HashedWheelTimer

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/671827.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

[Android]四大组件简介

在 Android 开发中&#xff0c;“四大组件”&#xff08;Four Major Components&#xff09;是指构成 Android 应用程序的四种核心组件&#xff0c;它们通过各自的方式与系统交互&#xff0c;实现应用的多样功能。这些组件是&#xff1a;Activity、Service、Broadcast Receiver…

IOS自动化—将WDA打包ipa批量安装驱动

前言 CSDN&#xff1a; ios自动化-Xcode、WebDriverAgent环境部署 ios获取原生系统应用的包 如果Mac电脑没有配置好Xcode相关环境,可以参考以上文章。 必要条件 Mac电脑&#xff0c;OS版本在12.4及以上&#xff08;低于这个版本无法安装Xcode14&#xff0c;装不了Xcode14就…

纯血鸿蒙APP实战开发——自定义视图实现Tab效果

介绍 本示例介绍使用Text、List等组件&#xff0c;添加点击事件onclick,动画&#xff0c;animationTo实现自定义Tab效果。 效果预览图 使用说明 点击页签进行切换&#xff0c;选中态页签字体放大加粗&#xff0c;颜色由灰变黑&#xff0c;起到强调作用&#xff0c;同时&…

STM32编译前置条件配置

本文基于stm32f104系列芯片&#xff0c;记录编程代码前需要的操作&#xff1a; 添加库文件 在ST官网下载标准库STM32F10x_StdPeriph_Lib_V3.5.0&#xff0c;解压后&#xff0c;得到以下界面 启动文件 进入Libraries&#xff0c;然后进入CMSIS&#xff0c;再进入CM3&#xff…

excel公式后面加的““是什么意思呢?

这个大体上有两种用意。 1.将数值转换成文本 VLOOKUP(F2,A:C,3,0) 举个使用VLOOKUP函数的场景&#xff0c;如下图所示&#xff0c;员工信息表A:C区域中&#xff0c;A列员工号是文本型数字&#xff0c;使用VLOOKUP函数查询找的时候&#xff0c;F列的员工号数值型、文本型都有…

Linux学习(一)-- 简单的认识

目录 1. Linux的诞生 2.Linux发行版 拓展&#xff1a; &#xff08;1&#xff09;什么是Linux系统的内核&#xff1f; &#xff08;2&#xff09;什么是Linux系统发行版&#xff1f; 1. Linux的诞生 Linux创始人: 林纳斯 托瓦兹 Linux 诞生于1991年&#xff0c;作者上大学…

【copilot 使用指南 - @workspace】

为什么需要workspace 默认情况下&#xff0c;copilot只能分析当前文件中的代码内容&#xff0c; 那么如何让copliot 跨文件分析&#xff0c;分析整个项目&#xff0c;分析整个代码目录下的代码&#xff0c;就要用到workspace&#xff0c;举例 &#xff1a;假设如下代码 index…

2025第23届太原煤炭(能源)工业技术与装备展览会

第二十三届太原煤炭&#xff08;能源&#xff09;工业技术与装备展览会 邀 请 函 指导单位&#xff1a; 中国煤炭工业协会 主办单位&#xff1a;山西省煤炭工业协会 承办单位&#xff1a;太原奇新展览有限公司 展览时间&#xff1a;2025年4月22-24日 展览地点&#xff1a…

k8s集群Grafana精选dashboard页面

文章目录 参考文档 Grafana自选模板推荐模板&#xff1a;13332、13824、14518Grafana默认配置我们选择 Node Exporter/Nodes 的 Dashboard 进去&#xff1a;点击 Kubernetes/Networking/Cluster 进去使用模板查看结果 Grafana接入Prometheus数据Grafana添加监控模板导入 1860_r…

获取转转数据,研究完转转请求,tx在算法方面很友好。

本篇文章仅供学习讨论。 文章中涉及到的代码、实例&#xff0c;仅是个人日常学习研究的部分成果。 如有不当&#xff0c;请联系删除。 在研究完阿里的算法以后&#xff08;其实很难说研究完&#xff0c;还有很多内容没有研究透&#xff0c;只能说暂时告一段落&#xff09;&…

bfs之走迷宫

文章目录 走迷宫广度优先遍历代码Java代码打印路径 走迷宫 给定一个 nm 的二维整数数组&#xff0c;用来表示一个迷宫&#xff0c;数组中只包含 0或 1&#xff0c;其中 0表示可以走的路&#xff0c;1表示不可通过的墙壁。 最初&#xff0c;有一个人位于左上角 (1,1) 处&#…

流畅的python-学习笔记_符合python风格的对象

对象表示形式 查看对象说明&#xff0c;可以通过__repr__和__str__方法&#xff0c;前者主要用于开发者&#xff0c;后者主要用于用户&#xff0c;这两个方法分别对内置函数repr和str函数提供支持 向量类 备选构造方法 classmethod和staticmethod staticmethod用的不是特别…