分布式任务调度框架XXL-JOB详解

分布式任务调度

概述

场景: 如12306网站根据不同车次设置放票时间点,商品成功发货后向客户发送短信提醒等任务,某财务系统需要在每天上午10天前统计前一天的账单数据

任务的调度是指系统为了完成特定业务,基于给定的时间点,时间间隔,执行次数等条件自动执行某个任务

  • 多线程: 充分利用单机的资源
  • 分布式加多线程: 使用多台计算机且每台计算机使用多线程处理,可扩展性更强

分布式任务调度是指将任务调度程序分布式构建而不再是将任务调度的程序都集成在某个服务中,这样可以大大提高任务的调度处理能力

  • 并行任务调度:由于一台计算机CPU的处理能力是有限的,将任务调度程序分布式部署可以让多台计算机共同去完成任务调度,
  • 高可用:若某一个实例宕机不影响其他实例来执行任务
  • 弹性扩容:将任务分割为若干个分片并由不同的实例并行执行,根据情况向集群中增加实例来提高任务调度的处理效率
  • 任务管理与监测:对系统中存在的所有定时任务进行统一的管理及监测,让开发人员及运维人员能够时刻了解任务执行情况,从而做出快速的应急处理响应
  • 避免任务重复执行(相同任务在多个运行实例上只执行一次):当任务调度以集群方式部署时,同一个任务调度程序可能会执行多次,如重复发放了多次优惠券

在这里插入图片描述

传统实现方式

多线程方式实现: 开启一个线程,每sleep一段时间就去检查是否已到预期执行时间

public static void main(String[] args) {    // 任务执行的间隔时间final long timeInterval = 1000;Runnable runnable = new Runnable() {public void run() {while (true) {//TODO:somethingtry {Thread.sleep(timeInterval);} catch (InterruptedException e) {e.printStackTrace();}}}};Thread thread = new Thread(runnable);thread.start();
}

Timer方式实现: 每个Timer对应一个线程,因此可以同时启动多个Timer并行执行多个任务,同一个Timer中的任务是串行执行

public static void main(String[] args){  Timer timer = new Timer();  timer.schedule(new TimerTask(){@Override  public void run() {  //TODO:something}  }, 1000, 2000);  //1秒后开始调度,每2秒执行一次
}

ScheduledExecutor方式实现: 每一个被调度的任务都会由线程池中一个线程去执行,因此任务是并发执行的相互之间不会受到干扰

public static void main(String [] agrs){ScheduledExecutorService service = Executors.newScheduledThreadPool(10);service.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {//TODO:somethingSystem.out.println("todo something");}}, 1,2, TimeUnit.SECONDS);
}

调度框架实现

传统方式缺点: 仅能完成基于开始时间与重复间隔的任务调度,对于复杂的调度需求无法完成

  • 复杂需求: 如设置每月第一天凌晨1点执行任务、复杂调度任务的管理、任务间传递数据

Quartz任务调度框架: 既支持简单的按时间间隔调度,也能通过CronTrigger表达式(秒/分/时/日/月/周/)设置按日历进行任务调度

  • JobDetail:负责定义需要执行的任务逻辑
  • Trigger:负责设置调度策略
  • Scheduler: 将二者组装在一起,并触发任务开始执行
public static void main(String [] agrs) throws SchedulerException {// 创建一个SchedulerSchedulerFactory schedulerFactory = new StdSchedulerFactory();Scheduler scheduler = schedulerFactory.getScheduler();// 创建JobDetailJobBuilder jobDetailBuilder = JobBuilder.newJob(MyJob.class);jobDetailBuilder.withIdentity("jobName","jobGroupName");JobDetail jobDetail = jobDetailBuilder.build();// 创建触发的CronTrigger支持按日历调度CronTrigger trigger = TriggerBuilder.newTrigger().withIdentity("triggerName", "triggerGroupName").startNow().withSchedule(CronScheduleBuilder.cronSchedule("0/2 * * * * ?")).build();scheduler.scheduleJob(jobDetail,trigger);scheduler.start();
}
// 定义需要执行的任务逻辑
public class MyJob implements Job {@Overridepublic void execute(JobExecutionContext jobExecutionContext){System.out.println("todo something");}
}

XXL-JOB框架

概述

XXL-JOB是一个轻量级分布式任务调度平台,学习简单、轻量级、易扩展,主要有调度中心、执行器、任务三部分组成

  • 调度中心:负责管理调度信息,按照调度配置发出调度请求,自身不承担业务代码

  • 任务执行器:负责接收调度中心发出的调度请求即要执行哪个任务,接收到任务后会放入线程池中的任务队列,任务执行完后将执行结果上报

  • 任务:执行处理具体业务的逻辑代码

在这里插入图片描述

docker部署

使用容器进行集群部署xxl-job-admin任务调度中心,通过nginx对其做负载均衡访问,对http://调度中心服务IP地址:端口/xxl-job-admin/进行代理

# 拉取镜像
docker pull xuxueli/xxl-job-admin
# 启动容器
docker run -p 8080:8080 -v /tmp:/data/applogs --name xxl-job-admin  -d xuxueli/xxl-job-admin:{版本号}# 自定义数据源
docker run -e PARAMS="--spring.datasource.url=jdbc:mysql://127.0.0.1:3306/xxl_job?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai" -p 8080:8080 -v /tmp:/data/applogs --name xxl-job-admin  -d xuxueli/xxl-job-admin:{指定版本}# 编写脚本启动xxl-job调度中心
sh /data/soft/restart.sh

搭建XXL-JOB工程

第一步从GitHub或Gitee下载XXL-JOB工程,我这里使用的是2.3.1版本

在这里插入图片描述

第二步:执行官方提供的doc目录下的tables_xxl_job.sql数据库脚本创建xxl_job_admin工程需要的数据库表
在这里插入图片描述

第三步:在xxl-job-admin工程的配置文件application.properties中设置端口号和数据库,然后启动服务并使用浏览器访问,账号和密码为admin/123456

在这里插入图片描述

第四步:启动成功之后也可以使用maven命令将xxl-job-admin工程打包然后将其上传至Linux中运行方便访问

# 启动项目
nohup java -jar /绝对路径/xxl-job-admin-2.3.1.jar &

在这里插入图片描述

添加并配置执行器

第一步:进入xxl-job调度中心中提前声明一个执行器,注意要想真正创建一个执行器还是要在Java程序中

第二步:在媒资理模块的media-service工程中添加xxl-job-core依赖,这里面将调度中心和执行器的工作流程都提前写好了

<dependency><groupId>com.xuxueli</groupId><!--项目的父工程已约定了版本2.3.1--><artifactId>xxl-job-core</artifactId>
</dependency>

第三步:在Nacos中的dev环境下创建media-service-dev.yaml配置文件指定执行器将要注册的调度中心地址和启动端口号

  • 配置执行器的应用名时需要与调度中心添加的执行器名称相同,双方会根据名称建立绑定关系
xxl:job:admin: addresses: http://192.168.101.65:8088/xxl-job-adminexecutor:appname: media-process-service # 执行器的应用名与调度中心中添加的名称相同address: ip: port: 9999 # 执行器启动的端口,如果本地启动多个执行器注意端口不能重复logpath: /data/applogs/xxl-job/jobhandlerlogretentiondays: 30accessToken: default_token

第三步:将xxl-job-executor-sample-springboot工程下的XxlJobConfig拷贝到媒资管理模块的mdeia-service工程的config包下

  • XxlJobConfig配置类中提供了创建的执行器对应的Bean,我们需要将其注册到容器中
<!--引入依赖-->
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-commons</artifactId><version>${version}</version>
</dependency>
@Configuration
public class XxlJobConfig {private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);@Value("${xxl.job.admin.addresses}")private String adminAddresses;@Value("${xxl.job.accessToken}")private String accessToken;@Value("${xxl.job.executor.appname}")private String appname;@Value("${xxl.job.executor.address}")private String address;@Value("${xxl.job.executor.ip}")private String ip;@Value("${xxl.job.executor.port}")private int port;@Value("${xxl.job.executor.logpath}")private String logPath;@Value("${xxl.job.executor.logretentiondays}")private int logRetentionDays;// 执行器对应的Bean@Beanpublic XxlJobSpringExecutor xxlJobExecutor() {logger.info(">>>>>>>>>>> xxl-job config init.");XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();xxlJobSpringExecutor.setAdminAddresses(adminAddresses);xxlJobSpringExecutor.setAppname(appname);xxlJobSpringExecutor.setAddress(address);xxlJobSpringExecutor.setIp(ip);xxlJobSpringExecutor.setPort(port);xxlJobSpringExecutor.setAccessToken(accessToken);xxlJobSpringExecutor.setLogPath(logPath);xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);return xxlJobSpringExecutor;}
}

第四步:测试执行器与调度中心是否正常通信,通过启动媒资管理模块的接口工程(依赖service工程)启动执行器,然后查看调度中心的执行器管理界面

在这里插入图片描述

添加并配置任务

常见任务相关的基础,调度,任务配置
在这里插入图片描述

配置选项
调度类型固定速度: 按固定的间隔定时调度
Cron: 通过Cron表达式(秒/分/时/日/月/周)实现更丰富的定时调度策略,xxl-job也提供了图形界面去配置
30 10 1 * * ? 指定每天1点10分30秒触发,0/30 * * * * ? 每30秒触发一次
运行模式BEAN: 在项目工程中编写执行器的任务代码
GLUE: 在调度中心的任务参数中编写任务代码
JobHandler(任务方法名)在项目工程中使用@XxlJob注解指定的任务方法名称
路由策略(即当执行器集群部署时,调度中心向哪个执行器下发任务)第一个: 只向第一个执行器下发任务

第一步:参考xxl-job-executor-sample-springboot工程下的任务类,在媒资管理模块中media-service工程的service包下新建jobhandler存放任务类

注解作用
@XxlJobvalue=“自定义jobhandler名称,与在调度中心新建任务时指定的JobHandler属性值一致”
init = “JobHandler初始化方法”
destroy = “JobHandler销毁方法”)
XxlJobHelper.log打印的执行日志
@Component
@Slf4j
public class SampleJob {private static Logger logger = LoggerFactory.getLogger(SampleXxlJob.class);// 默认任务结果为成功状态,不需要主动设置,也可以通过 "XxlJobHelper.handleFail/handleSuccess" 自主设置任务结果// 简单任务@XxlJob("testJob")public void testJob() throws Exception {// 任务执行逻辑log.info("开始执行.....");}// 分片广播任务@XxlJob("shardingJobHandler")public void shardingJobHandler() throws Exception {}// 命令行任务@XxlJob("commandJobHandler")public void commandJobHandler() throws Exception {}// 跨平台Http任务@XxlJob("httpJobHandler")public void httpJobHandler() throws Exception {}// 生命周期任务,任务初始化与销毁时,支持自定义相关逻辑@XxlJob(value = "demoJobHandler2", init = "init", destroy = "destroy")public void demoJobHandler2() throws Exception {XxlJobHelper.log("XXL-JOB, Hello World.");}public void init(){logger.info("init");}public void destroy(){logger.info("destroy");}}

第二步:在任务调度中心的任务管理界面添加任务并设置相关的配置执行任务的执行器名称

在这里插入图片描述

第三步:在调度中心的任务管理界面启动/停止添加的任务,然后通过调度日志查看执行器执行任务的情况,任务执行时间后注意清理日志

在这里插入图片描述

第四步:测试任务方法的执行,启动媒资管理模块的api工程(依赖service工程)启动执行器,等待执行器处理任务调度中心下发的任务,查看任务方法的执行情况

在这里插入图片描述

任务的高级配置

分布式任务处理就是启动多个执行器组成一个集群去执行任务,此时调度中心就需要考虑执行器在集群部署下执行任务的策略,保证任务高效执行且不重复

在这里插入图片描述

高级配置选项
路由策略FIRST: 固定选择第一个机器
LAST:固定选择最后一个机器
ROUND: 轮询
RANDOM: 随机选择在线的机器
CONSISTENT_HASH(一致性HASH): 按照Hash算法计算任务Id对应的Hash值,最终所有任务均匀散列在不同机器上
LEAST_FREQUENTLY_USED: 使用频率最低的机器优先被选举
LEAST_RECENTLY_USED: 最久未使用的机器优先被选举
FAILOVER(故障转移): 如果执行器集群中某一台机器故障,按照顺序依次进行心跳检测,第一个心跳检测成功的机器选定为目标执行器并发起调度
BUSYOVER(忙碌转移): 如果执行器集群中某一台机器正在执行任务,按照顺序依次进行空闲检测,第一个空闲检测成功的机器选定为目标执行器并发起调度
SHARDING_BROADCAST(分片广播): 广播触发对应集群中所有机器执行一次任务,同时系统自动传递分片参数;可根据分片参数开发分片任务
子任务(对于关联的任务,实现一个任务执行完成去执行另一个任务)每个任务都拥有一个唯一的任务ID(任务ID可以从任务列表获取),当本任务执行结束并且执行成功时,将会触发并主动调度一次子任务ID所对应的任务
调度过期策略忽略: 调度过期后执行器会忽略过期的任务,从当前时间开始重新计算下次触发时间
立即执行一次: 调度过期后会立即执行一次,并从当前时间开始重新计算下次触发时间
阻塞处理策略(调度过于密集时执行器来不及处理时的策略)单机串行(默认): 调度请求进入单机执行器后,调度请求进入FIFO队列并以串行方式运行
丢弃后续调度: 调度请求进入单机执行器后,发现执行器存在运行的调度任务,本次请求将会被丢弃并标记为失败
覆盖之前调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,将会终止当前运行中的调度任务并清空队列,然后运行请求的调度任务;
任务超时时间(支持自定义)任务运行超时将会主动中断任务
失败重试次数(支持自定义)当任务失败时将会按照预设的失败重试次数主动进行重试

路由策略之分片广播测试

分片: 调度中心以执行器为维度进行分片,将集群中的每个执行器都标上序号

  • 分片任务场景: 10个执行器的集群来处理10w条数据,每台机器只需要处理1w条数据,耗时降低10倍

广播: 每次调度会向集群中的所有执行器发送任务调度请求

  • 广播任务场景:让所有执行器同时运行shell脚本、集群所有节点进行缓存更新等
    在这里插入图片描述

每个执行器接收到调度请求的同时会接收分片参数(序号和总数),基于当前业务需求使用分片参数控制执行器是否执行任务,控制执行器集群分布式处理任务

  • 分片参数: 执行器可以根据分片参数动态的领取属于自己的任务,保证执行器直接不会执行重复的任务,充分发挥每个执行器的能力
  • xxl-job支持动态扩容执行器集群,当任务量增加时可以部署更多的执行器到集群中,此时调度中心会动态增加分片的数量

第一步:定义作业分片的任务方法获取分片参数,然后在任务调度中心添加对应的任务方法shardingJobHandler并设置路由策略为分片广播

在这里插入图片描述

// 分片广播任务
@XxlJob("shardingJobHandler")
public void shardingJobHandler() throws Exception {// 分片序号,从0开始int shardIndex = XxlJobHelper.getShardIndex();// 分片总数   int shardTotal = XxlJobHelper.getShardTotal();log.info("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal);log.info("开始执行第"+shardIndex+"批任务");
}

第二步:在Nacos的dev环境下media-service-dev.yaml配置文件中增加本地优先配置

# 本地优先
spring:cloud:config:override-none: true

第三步:启动媒资管理模块的接口工程并复制一份执行器实例,添加vm参数选项指定新实例的启动端口号执行器的启动端口号,执行器的名称不变

# Dserver.port=63051 -Dxxl.job.executor.port=9998对应如下配置项
server: port: 53051
xxl:job:executor:port: 9998

在这里插入图片描述

第四步:观察任务调度中心,查看已经启动的执行器,调度中心会根据实际情况动态调整执行器的总分片数

在这里插入图片描述

第五步:在调度中心启动任务,观察执行器实例接收下发任务的执行情况,查看接收的分片序号和总分片数

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/451605.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

情人节适合送哪些礼物?2024年情人节送礼指南大推荐!

情人节即将来临&#xff0c;这是一年一度表达爱意的时刻。在这个特殊的日子里&#xff0c;送上一份精心挑选的礼物&#xff0c;将会让爱意更加深刻。但是&#xff0c;肯定会有朋友会困惑于选择哪种礼物能够最好地表达您的心意。不用担心&#xff0c;今天小编就为大家精心准备了…

图书管理系统设计

工程链接放在这里 https://download.csdn.net/download/Samature/88805507使用 注意事项 登录 登录有初始账号&#xff1a;yzb 密码&#xff1a;123123123 后续可以自己加 保存的用户信息位置和题目 library是图书馆内容 users是用户名 可能遇到的bug 暂无&#xff0c;有的…

【Python】【完整代码】解析Excel 文件中的内容并检查是否包含某字符串,并返回判断结果

示例&#xff1a; 开发需求&#xff1a;解析Excel 文件中的内容并检查是否包含 "Fail" 字符&#xff0c;若没有则返回True&#xff0c;若有则返回False 实现代码&#xff1a; #!/usr/bin/env python3 # -*- encoding: utf-8 -*-File : check_excel_for_fail.py Ti…

filebeat采集中断与变慢问题分析

4、未采集的那段时间内无以下日志&#xff0c;这段时间内数据源正常&#xff0c;应能被正常采集到。 5、相关进程资源&#xff0c;服务器磁盘、cpu、内存无明显异常。 6、日志中断前有如下报错。 2022-02-15T15:22:22.2230800 INFO log/harvester.go:254 Harvester started fo…

this指针详细总结 | static关键字 | 静态成员

文章目录 1.this指针引入2.this指针的特性3.静态成员3.1.C语言中static的基本用法3.2.C中的static关键字 1.this指针引入 class student { public:student(const string& name){ _name name; }void print(){// _name<>this->_name<>(*this)._name// 说一下…

YOLOv5算法进阶改进(15)— 引入密集连接卷积网络DenseNet

前言:Hello大家好,我是小哥谈。DenseNet(密集连接卷积网络)是一种深度学习神经网络架构,它在2017年由Gao Huang等人提出。DenseNet的核心思想是通过密集连接(dense connection)来促进信息的流动和共享。在传统的卷积神经网络中,每个层的输入只来自于前一层的输出。而在…

Python爬虫从基础到入门:数据接口实战--获取豆瓣阅读热度最高的书籍信息

接着上一篇文章&#xff1a;Python爬虫从基础到入门&#xff1a;找数据接口&#xff0c;接下来实战一下&#xff0c;以获取豆瓣阅读这个网站热度最高的书籍信息为例&#xff0c;网址为&#xff1a;豆瓣阅读 Python爬虫从基础到入门&#xff1a;数据接口实战--获取豆瓣阅读热度…

Java自救手册

目录 访问地址 访问地址&#xff0c;发现不通&#xff0c;无法访问&#xff1a; 网络不通一般有两种情况&#xff1a; Maven 拿Maven 拿到Maven以后 Maven单独的报红 Git git注意&#xff1a; 目录 访问地址 访问地址&#xff0c;发现不通&#xff0c;无法访问&…

(南京观海微电子)——TFT闪屏分析

LCD显示屏闪屏的原因&#xff1a;屏蔽线圈&#xff1b; 信号干扰&#xff1b; 硬件; 刷新频率设置&#xff1b; 监控时间过长&#xff1b; 频率太高&#xff1b; 类似于光源的频率。 一、TFT液晶屏本身的频率太高导致闪屏 TFT液晶屏本身的频率太高导致了闪屏&#xff0c;不过…

MySQL-事务(TRANSACTION)

文章目录 1. 事务概述2. 事务的四大特性&#xff08;ACID&#xff09;3. 控制事务4. 并发事务产生的问题5. 事务的隔离级别6. 拓展6.1 InnoDB如何解决幻读&#xff1f;6.2 MySQL实现事务的原理&#xff1f; 1. 事务概述 定义&#xff1a;数据库的事务&#xff08; Transaction…

小华和小为的聚餐地点 - 华为OD统一考试

OD统一考试&#xff08;C卷&#xff09; 分值&#xff1a; 200分 题解&#xff1a; Java / Python / C 题目描述 小华和小为是很要好的朋友&#xff0c;他们约定周末一起吃饭。 通过手机交流&#xff0c;他们在地图上选择了多个聚餐地点(由于自然地形等原因&#xff0c;部分聚…

1 月 30 日算法练习-数论

唯一分解定理 唯一分解定理指的是&#xff1a;对于任意一个>1的正整数&#xff0c;都可以以唯一的一种方式分解为若干质因数的乘积。 x p 1 k 1 ⋅ p 2 k 2 ⋅ … ⋅ p m k m x p_1^{k_1} \cdot p_2^{k_2} \cdot \ldots \cdot p_m^{k_m} xp1k1​​⋅p2k2​​⋅…⋅pmkm​…