一、接着上文
上文我们讲述了使用redisson的RDelayedQueue实现分布式延迟队列,本文我们将自己JDK的延迟队列DelayQueue实现。
相比前者的实现,作为进程内的延迟队列,它会遇到许多技术难点:
- 如何支持分布式的多个节点部署场景
- 应用重启会恢复延时队列
- 冷数据如何转换为热数据
- 如何删除延迟队列中的任务
随后,我们也将提及:
- 保存任务至延迟队列(生产者)
- 读取延迟队列中的任务(消费者)
二、设计概要
-
冷数据:mysql表中的任务数据
-
热数据:jdk 延迟队列中的任务
-
广播事件:删除延迟队列中的任务,发布的是广播事件,可以使用redis topic实现。
-
本地事件:分布式多节点部署的时候,每个任务只保存在其中一个节点的延迟队列中,可以使用spring事件驱动实现。
-
延迟队列 DelayQueueJob, 它实现了接口Delayed
包括任务的交易流水号和过期时间(即任务的回调时间)
import lombok.Builder;
import lombok.Data;import java.util.Date;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;/*** @author xxx*/
@Builder
@Data
public class DelayQueueJob implements Delayed {/*** 交易流水号*/private String transNo;/*** 到期时间*/private Date expireDate;public DelayQueueJob(String transNo, Date expireDate) {super();this.transNo = transNo;this.expireDate = expireDate;}/*** 用于队列中排序过期时间** @param o* @return*/@Overridepublic int compareTo(Delayed o) {return Long.valueOf(this.expireDate.getTime()).compareTo(Long.valueOf(((DelayQueueJob) o).expireDate.getTime()));}/*** 用于获取过期时间* 延迟关闭时间 = 过期时间 - 当前时间** @param unit* @return*/@Overridepublic long getDelay(TimeUnit unit) {return this.expireDate.getTime() - System.currentTimeMillis();}
}
三、应用启动流程
解决恢复延迟队列的问题。因为DelayQueue是进程内的,一旦重启,将被销毁。
import cn.hutool.core.date.DateTime;
import cn.hutool.core.date.DateUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;import java.util.List;
import java.util.concurrent.TimeUnit;@Slf4j
@Service
@RequiredArgsConstructor
public class ApplicationStartupListener implements ApplicationListener<ApplicationReadyEvent> {@Overridepublic void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {// 实现代码参考上面的流程图}
}
四、定时任务流程
解决冷数据如何转换为热数据的问题,防止延时任务过多导致消耗过多的jvm内存,所以只有回调时间将近的任务才放入延迟队列。
五、如何删除延迟队列中的任务
删除延迟队列的任务:发送广播消息通知所有的节点,当不是当前节点的时候,执行删除。
if (!NetUtil.getLocalhostStr().equals(ipAddress)) {DelayQueueSingleton.getDelayQueue().remove(transNo);
}
DelayQueueSingletons是一个单例类,详见下:
public class DelayQueueSingleton {private static volatile CustomDelayQueue<DelayQueueJob> delayQueue;private DelayQueueSingleton() {}public static CustomDelayQueue<DelayQueueJob> getDelayQueue() {if (delayQueue == null) {synchronized (DelayQueueSingleton.class) {if (delayQueue == null) {delayQueue = new CustomDelayQueue<>();}}}return delayQueue;}}
这里为了删除延迟队列的任务,我们对DelayQueue进行了重写。
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;public class CustomDelayQueue<T extends Delayed> {private final DelayQueue<T> queue = new DelayQueue<>();private final Map<String, T> map = new ConcurrentHashMap<>();public boolean put(T task, String taskId) {// 如果任务已存在,则删除旧任务,防止重复添加this.remove(taskId);map.put(taskId, task);return queue.add(task);}public boolean remove(String taskId) {// 先删除map,再删除queueT task = map.remove(taskId);if (task != null) {return queue.remove(task);}return false;}public T take() throws InterruptedException {return queue.take();}
}
六、保存任务至延迟队列(生产者)
// 如果通知时间在一定时间范围内
if (DateUtil.offsetMinute(new DateTime(), commonConfig.getHotDataTimeLine()).after(event.getNotifyDate())) {DelayQueueSingleton.getDelayQueue().put(DelayQueueJob.builder().transNo(event.getTransNo()).expireDate(event.getNotifyDate()).build(), event.getTransNo());}
七、读取延迟队列中的任务(消费者)
作为延迟队列的消费者,它的实现和上一篇文章实现类似。不同的是take()获取任务不一样。
String transNo = null;
Date notifyDate = null;DelayQueueJob job = DelayQueueSingleton.getDelayQueue().take();
if (null != job) {transNo = job.getTransNo();notifyDate = job.getExpireDate();
}if (null == transNo) {return;
}if (log.isInfoEnabled()) {log.info("开始执行延迟队列中的任务,transNo={},notifyDate={}", transNo, notifyDate);
}// 异步执行你的操作
notifyTaskService.handleTask(transNo, notifyDate);
八、总结
作为进程内的延迟队列,在多点部署的分布式集群环境下, 代码明显比上一篇要复杂得多。
它们都需要的步骤是:
- 任务的生产
- 任务的消费
- 移除任务
DelayQueue额外多出来的步骤是:
- 应用启动的时候拉取回调时间将近的未完成任务(更新marked标记为true,防止重复拉取冷数据)
- 定时拉取未标记且回调时间将近的未完成任务(和上面必须是互斥,等待上一步执行完成,否则会导致重复拉取)
- 删除延迟队列DelayQueue的任务,必须发布广播消息给全部节点。(引入广播消息机制)
由此可见,任务表的字段marked仅供DelayQueue使用,防止重复拉取数据库的任务到热数据区。
@Column(name = "marked", nullable = false, columnDefinition = "TINYINT(1) default 0 COMMENT '是否已标记为热数据'")private Boolean marked;
附:相关系列文章链接
延时任务通知服务的设计及实现(一)-- 设计方案
延时任务通知服务的设计及实现(二)-- redisson的延迟队列RDelayedQueue
延时任务通知服务的设计及实现(三)-- JDK的延迟队列DelayQueue
延时任务通知服务的设计及实现(四)-- webhook执行任务
延时任务通知服务的设计及实现(五)-- Netty时间轮HashedWheelTimer