引言
基于cron表达式的定时任务实现,因为cron表达式对于每个任务不确定,所以使用线程池来动态的创建和销毁定时任务
依赖
因为使用的spring自带的调度功能,所以没有额外的依赖,我的项目版本为:
使用
首先需要定义一个线程池,使用@configuration
注解配置
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;@Configuration
public class SchedulerConfig {@Beanpublic TaskScheduler taskScheduler() {ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();// 设置线程池大小scheduler.setPoolSize(10);scheduler.setThreadNamePrefix("workshop-scheduled-task-");scheduler.initialize();return scheduler;}
}
这个配置中定义了一个调度的线程池,并且配置了线程池大小、线程名称前缀以及初始化操作
然后实现一个定时管理
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.support.CronTrigger;
import org.springframework.stereotype.Service; import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Map;
import java.util.concurrent.ScheduledFuture;@Slf4j
@Service
public class DynamicScheduledTaskService {@Resourceprivate TaskScheduler taskScheduler;/** * 使用Map来关联任务ID和ScheduledFuture* 如有必要,可以通过Redis等数据库进行管理*/private final Map<String, ScheduledFuture<?>> tasks = Maps.newConcurrentMap();/*** 周期调度执行任务* * @param taskId 任务ID * @param cronExpression cron表达式 * @param task 实际任务 */public void schedulingTask(String taskId, String cronExpression, Runnable task) { log.info("添加定时调度任务:{},cron为:{}", taskId, cronExpression); // 取消已存在的同ID任务ScheduledFuture<?> existingTask = tasks.get(taskId);if (existingTask != null && !existingTask.isCancelled()) { existingTask.cancel(false); } // 包装任务以便在执行完毕后自动取消 Runnable wrappedTask = () -> {log.info("{} 执行定时调度任务:{}", DateUtil.nowToString(), taskId); task.run(); }; // 安排任务并保存其Future ScheduledFuture<?> future = taskScheduler.schedule(wrappedTask, new CronTrigger(cronExpression)); tasks.put(taskId, future); } /*** 定时单次执行调度任务* * @param taskId 任务ID* @param execTime 执行时间* @param task 实际任务*/public void singleScheduleTask(String taskId, LocalDateTime execTime, Runnable task) {log.info("添加定时调度任务:{},执行时间为:{}", taskId, execTime);DateTimeFormatter cronTimeFormatter = DateTimeFormatter.ofPattern("ss mm HH dd MM ?");// 取消已存在的同ID任务ScheduledFuture<?> existingTask = tasks.get(taskId);if (existingTask != null && !existingTask.isCancelled()) {existingTask.cancel(false);}// 包装任务以便在执行完毕后自动取消Runnable wrappedTask = () -> {log.info("{} 执行单次调度任务:{}", LocalDateTime.now(), taskId);try {task.run();} finally { // 无论任务成功还是异常终止,都取消后续执行 this.stopTask(taskId); } }; // 安排任务并保存其Future ScheduledFuture<?> future = taskScheduler.schedule(wrappedTask, new CronTrigger(cronTimeFormatter.format(execTime))); tasks.put(taskId, future);}public void stopTask(String taskId) { ScheduledFuture<?> future = tasks.get(taskId); if (future != null && !future.isCancelled()) { future.cancel(false); tasks.remove(taskId); log.info("{} 停止调度任务:{}", DateUtil.nowToString(), taskId); } } }
此定时管理服务中一共实现了两种情况
- 周期性的执行任务,手动取消才进行取消
- 单次执行任务,执行后自动销毁任务
使用服务样例
@Slf4j
@Service
public class UseScheduleService{@Resourceprivate DynamicScheduledTaskService scheduledTaskService;public void startSchedulingTask(){// 其余逻辑String taskId="taskId";// cron表达式,以从0分钟开始,每隔一分钟执行一次为例String crontab = "0 0/1 * * * ?";scheduledTaskService.schedulingTask(taskId, crontab, () -> {// 此处是执行任务的逻辑});}public void startScheduleTask(){// 其余逻辑String taskId="taskId";// 执行的时间LocalDateTime execTime = LocalDateTime.now();scheduledTaskService.scheduleTask(taskId, execTime, () -> {// 此处是执行任务的逻辑});}public void stopTask(String taskId){scheduledTaskService.stopTask(taskId);}}
这样就完成了定时任务的动态管理
定时管理之外
初始化
在使用定时任务的场景中,我们一般还会有重启服务时候,需要针对重启之前已经在执行的任务进行恢复定时,这里我选择使用Spring的 ApplicationRunner
进行服务启动后执行逻辑的管理
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.ApplicationArguments;@Slf4j
@Service
public class UseScheduleService implements ApplicationRunner{
@Resource
private DynamicScheduledTaskService dynamicScheduledTaskService;@Override public void run(ApplicationArguments args) {log.info("============= 初始化定时任务begin =============");// 获取需要启动的列表List<Object> needStartList = new ArrayList<>();// 进行定时任务的写入needStartList.forEach(i->dynamicScheduledTaskService.startSchedulingTask(i.getTaskId(), i.getCronTab(),() -> {// 执行的逻辑}));log.info("============= 初始化定时任务end =============");}
}
定时任务工具类
我在使用定时任务时,还遇到了需要判定是否是合规的cron表达式以及根据cron表达式获取下次执行时间的需求,封装的工具类如下
import org.springframework.scheduling.support.CronSequenceGenerator; import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Date;public class CronUtils { private CronUtils() { } /** * 判断cron表达式是否无效 * * @param cron cron表达式 * @return 判定结果 */ public static Boolean isInvalidCron(String cron) { return !isValidCron(cron); } /** * 判断cron表达式是否有效 * * @param cron cron表达式 * @return 判定结果 */ public static Boolean isValidCron(String cron) { try { new CronSequenceGenerator(cron); return true; } catch (IllegalArgumentException ex) { return false; } } /** * 获取下次执行时间 * * @param cron cron表达式 * @return 下次执行时间 */ public static LocalDateTime getNextExecTime(String cron) {// 因为CronSequenceGenerator.next接受的入参只有java.util.Date,而我又习惯使用LocalDateTime,因此加入此转换return new CronSequenceGenerator(cron).next(new Date()).toInstant().atZone(ZoneId.systemDefault()).toLocalDateTime(); } }