一、背景
前文我们讲述了如何使用redisson的RDelayedQueue以及JDK的DelayQueue实现延迟队列,本文将补充一种实现延迟队列的方式–Netty的时间轮HashedWheelTimer。
它和JDK的DelayQueue一样,都存在着以下问题:
- 占jvm内存,数据量大的时候,可能会导致OOM
- 机器重启,内存中的延迟队列丢失
- 需要解决分布式部署的问题
针对第一个问题,我们的解决办法是:
- 1、设置热数据的时间范围,拉取通知时间将近的任务放入JVM内存,而非全部任务。热数据才存入延迟队列(我们称之为热数据),这样大大减少jvm内存。未到时间线的任务,存储在数据库中,我们称之为冷数据。
- 2、使用分布式定时任务,对多个节点进行轮询,每次拉取100个任务放入延迟队列。通过轮询,不至于让任务都存放在同一个jvm内存。
针对第二个问题,我们的解决办法是:
- 在应用重启后,查询热数据,将之存入延迟队列
针对第三个问题,我们的应对方法是:
- 1、冷数据转换为热数据,使用分布式的定时任务。也就是说,同一个时间中,同一个任务只会存入某一个jvm内存。
- 2、每次把任务放入延迟队列的时候,需要发布广播消息,由其他节点订阅,从延迟队列中移除该任务。
最后一点,我们在把任务放入延迟队列前,必须先删除jvm内存的任务。这样,防止同一个任务被重复存放至同一个jvm中的延迟队列。
另外,我们需要在任务执行的时候,判断期望执行时间是否和延迟队列中的任务执行时间一致,如果不一致,任务则忽略执行。
二、Netty时间轮HashedWheelTimer
本文许多部分和上文JDK的延迟队列DelayQueue 的许多流程类似,所以下面拣选重点描述。
1、定义任务
- NettyTimerJob.java
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;import java.util.Date;/*** @author xxx*/
@Slf4j
@Data
public class NettyTimerJob implements TimerTask {/*** 通知任务*/private NotifyTaskService notifyTaskService;/*** 交易流水号*/private String transNo;/*** 到期时间*/private Date expireDate;public NettyTimerJob(String transNo, Date expireDate, NotifyTaskService notifyTaskService) {this.transNo = transNo;this.expireDate = expireDate;this.notifyTaskService = notifyTaskService;}@Overridepublic void run(Timeout timeout) throws Exception {if (log.isInfoEnabled()) {log.info("开始执行延迟队列中的任务,transNo={},expireDate={}", transNo, expireDate);}// 异步执行你的操作(任务的执行)notifyTaskService.handleTask(transNo, expireDate);}
}
2、封装时间轮
删除时间轮中的任务,需要使用Map存储任务唯一标识对应的Timeout对象,调用timeout.cancel()取消任务。
- CustomHashedWheelTimer.java
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;/*** @author xxx*/
public class CustomHashedWheelTimer {private final HashedWheelTimer timer;private final Map<String, Timeout> map;public CustomHashedWheelTimer(HashedWheelTimer timer) {this.timer = timer;this.map = new ConcurrentHashMap<>();}public void put(String taskId, NettyTimerJob job, long delay, TimeUnit unit) {// 如果任务已存在,则删除旧任务,防止重复添加this.remove(taskId);Timeout timeout = timer.newTimeout(job, delay, unit);// 将Timeout对象存储到Map中map.put(taskId, timeout);}public boolean remove(String taskId) {// 从Map中移除对应的Timeout对象Timeout timeout = map.remove(taskId);if (timeout != null) {// 取消任务return timeout.cancel();}return false;}
}
- NettyTimerSingleton.java
import io.netty.util.HashedWheelTimer;
import io.netty.util.concurrent.DefaultThreadFactory;import java.util.concurrent.TimeUnit;/*** @author xxx*/
public class NettyTimerSingleton {private static volatile CustomHashedWheelTimer hashedWheelTimer;private NettyTimerSingleton() {}public static CustomHashedWheelTimer getInstance() {if (hashedWheelTimer == null) {synchronized (DelayQueueSingleton.class) {if (hashedWheelTimer == null) {hashedWheelTimer = new CustomHashedWheelTimer(new HashedWheelTimer(new DefaultThreadFactory("netty-timer"),100,TimeUnit.MILLISECONDS,512,true));}}}return hashedWheelTimer;}
}
3、操作延迟队列
- 保存任务至延迟队列(生产者)
if (DateUtil.offsetMinute(new DateTime(), commonConfig.getHotDataTimeLine()).after(event.getNotifyDate())) {final long delay = DateUtil.between(event.getNotifyDate(), new DateTime(), DateUnit.SECOND);NettyTimerSingleton.getInstance().put(event.getTransNo(),new NettyTimerJob(event.getTransNo(), event.getNotifyDate(), notifyTaskService),delay,TimeUnit.SECONDS);success = true;
}
- 删除延迟队列的任务
final String transNo = event.getTransNo();NettyTimerSingleton.getInstance().remove(transNo);
三、总结
Netty时间轮和JDK的DelayQueue的最大区别是,你不需一直轮询延迟队列中的任务是否到期。
它的任务执行是在io.netty.util.TimerTask的run()方法中,所以我们在封装任务的时候,需要把NotifyTaskService对象放入,以便在任务执行的时候调用。
和JDK的DelayQueue一样,你需要使用一个Map集合持久化延迟队列,因为每次将任务放入延迟队列前,你都必须先删除之前的任务。否则同一个任务会在延迟队列中保存多份,一来浪费jvm内存,二来会导致任务的重复执行。(虽然我们在任务执行的时候,会对任务进行兜底,期望执行时间与延迟队列中的任务时间进行对比)
最后说一下,我们使用的netty版本是 4.1.43.Final,具体的依赖关系见下:
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.2.4.RELEASE</version><relativePath/> <!-- lookup parent from repository -->
</parent><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
spring-boot-starter-data-redis依赖lettuce,后者又依赖netty,所以无需额外引入netty。
<dependency><groupId>io.lettuce</groupId><artifactId>lettuce-core</artifactId><version>5.2.1.RELEASE</version><scope>compile</scope></dependency>
当然,你如果引入了redisson框架,也无需额外引入netty框架。
由此也见,netty框架的使用范围之广。。。
附:相关系列文章链接
延时任务通知服务的设计及实现(一)-- 设计方案
延时任务通知服务的设计及实现(二)-- redisson的延迟队列RDelayedQueue
延时任务通知服务的设计及实现(三)-- JDK的延迟队列DelayQueue
延时任务通知服务的设计及实现(四)-- webhook执行任务
延时任务通知服务的设计及实现(五)-- Netty时间轮HashedWheelTimer