Powerjob学习记录

news/2025/4/2 8:28:10/文章来源:https://www.cnblogs.com/guitao-gt/p/18802523

一前言
本文章仅为个人学习总结记录,更多内容可直接访问Powerjob官方开源大佬的github或官方在线文档
官方github地址:https://github.com/PowerJob/PowerJob
官方在线文档:https://www.yuque.com/powerjob/guidence/quick_start

二 产品特性
PowerJob(原OhMyScheduler)是全新一代分布式任务调度与计算框架,其主要功能特性如下:
使用简单:提供前端Web界面,允许开发者可视化地完成调度任务的管理(增、删、改、查)、任务运行状态监控和运行日志查看等功能。
定时策略完善:支持 CRON 表达式、固定频率、固定延迟和API四种定时调度策略。
执行模式丰富:支持单机、广播、Map、MapReduce 四种执行模式,其中 Map/MapReduce 处理器能使开发者寥寥数行代码便获得集群分布式计算的能力。
工作流支持:支持在线配置任务依赖关系(DAG),以可视化的方式对任务进行编排,同时还支持上下游任务间的数据传递,以及多种节点类型(判断节点 & 嵌套工作流节点)。
执行器支持广泛:支持 Spring Bean、内置/外置 Java 类,另外可以通过引入官方提供的依赖包,一键集成 Shell、Python、HTTP、SQL 等处理器,应用范围广。
运维便捷:支持在线日志功能,执行器产生的日志可以在前端控制台页面实时显示,降低 debug 成本,极大地提高开发效率。
依赖精简:最小仅依赖关系型数据库(MySQL/PostgreSQL/Oracle/MS SQLServer...)
高可用 & 高性能:调度服务器经过精心设计,一改其他调度框架基于数据库锁的策略,实现了无锁化调度。部署多个调度服务器可以同时实现高可用和性能的提升(支持无限的水平扩展)。
故障转移与恢复:任务执行失败后,可根据配置的重试策略完成重试,只要执行器集群有足够的计算节点,任务就能顺利完成。

三 适用场景
有定时执行需求的业务场景:如每天凌晨全量同步数据、生成业务报表、未支付订单超时取消等。
有需要全部机器一同执行的业务场景:如使用广播执行模式清理集群日志。
有需要分布式处理的业务场景:比如需要更新一大批数据,单机执行耗时非常长,可以使用Map/MapReduce 处理器完成任务的分发,调动整个集群加速计算。
有需要延迟执行某些任务的业务场景:比如订单过期处理等。

四 基本概念
1.分组概念:
appName:应用名称,建议与用户实际接入 PowerJob 的应用名称保持一致,用于业务分组与隔离。一个 appName 等于一个业务集群,也就是实际的一个 Java 项目。

2.核心概念:
任务(Job):描述了需要被 PowerJob 调度的任务信息,包括任务名称、调度时间、处理器信息等。
任务实例( JobInstance,简称 Instance):任务(Job)被调度执行后会生成任务实例(Instance),任务实例记录了任务的运行时信息(任务与任务实例的关系类似于类与对象的关系)。
作业(Task):任务实例的执行单元,一个 JobInstance 存在至少一个 Task,具体规则如下:
o单机任务(STANDALONE):一个 JobInstance 对应一个 Task
o广播任务(BROADCAST):一个 JobInstance 对应 N 个 Task,N为集群机器数量,即每一台机器都会生成一个 Task
oMap/MapReduce任务:一个 JobInstance 对应若干个 Task,由开发者手动 map 产生
工作流(Workflow):由 DAG(有向无环图)描述的一组任务(Job),用于任务编排。
工作流实例(WorkflowInstance):工作流被调度执行后会生成工作流实例,记录了工作流的运行时信息。

3.扩展概念
JVM 容器:以 Maven 工程项目的维度组织一堆 Java 文件(开发者开发的众多 Java 处理器),可以通过前端网页动态发布并被执行器加载,具有极强的扩展能力和灵活性。
OpenAPI:允许开发者通过接口来完成手工的操作,让系统整体变得更加灵活。开发者可以基于 API 便捷地扩展 PowerJob 原有的功能。
轻量级任务:单机执行且不需要以固定频率或者固定延迟执行的任务 (>= v4.2.1)
重量级任务:非单机执行或者以固定频率/延迟执行的任务 (>= v4.2.1)

4.定时任务类型
API:该任务只会由 powerjob-client 中提供的 OpenAPI 接口触发,server 不会主动调度。
CRON:该任务的调度时间由 CRON 表达式指定。
固定频率:秒级任务,每隔多少毫秒运行一次,功能与 java.util.concurrent.ScheduledExecutorService#scheduleAtFixedRate 相同。
固定延迟:秒级任务,延迟多少毫秒运行一次,功能与 java.util.concurrent.ScheduledExecutorService#scheduleWithFixedDelay 相同。
工作流:该任务只会由其所属的工作流调度执行,server 不会主动调度该任务。如果该任务不属于任何一个工作流,该任务就不会被调度。
备注:固定延迟和固定频率任务统称秒级任务,这两种任务无法被停止,只有任务被关闭或删除时才能真正停止任务。

五 本地部署
1.拉取代码
https://gitee.com/KFCFans/PowerJob

2.创建数据库(仅需要创建数据库)
SQL CREATE DATABASE IF NOT EXISTS powerjob-daily DEFAULT CHARSET utf8mb4

3.修改配置文件
oms.env=DAILY
logging.config=classpath:logback-dev.xml

####### 外部数据库配置(需要用户更改为自己的数据库配置) #######
spring.datasource.core.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.core.jdbc-url=jdbc:mysql://localhost:3306/powerjob-daily?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai
spring.datasource.core.username=root
spring.datasource.core.password=No1Bug2Please3!
spring.datasource.core.hikari.maximum-pool-size=20
spring.datasource.core.hikari.minimum-idle=5

####### mongoDB配置,非核心依赖,通过配置(仅需要额外持久化日志场景下使用) oms.mongodb.enable=false 来关闭 #######
oms.mongodb.enable=true
spring.data.mongodb.uri=mongodb://localhost:27017/powerjob-daily

####### 邮件配置(不需要邮件报警可以删除以下配置来避免报错) #######
spring.mail.host=smtp.163.com
spring.mail.username=zqq@163.com
spring.mail.password=GOFZPNARMVKCGONV
spring.mail.properties.mail.smtp.auth=true
spring.mail.properties.mail.smtp.starttls.enable=true
spring.mail.properties.mail.smtp.starttls.required=true

####### 资源清理配置 #######
oms.instanceinfo.retention=1
oms.container.retention.local=1
oms.container.retention.remote=-1

####### 缓存配置 #######
oms.instance.metadata.cache.size=1024

####### 用户与权限体系配置 #######
oms.auth.initiliaze.admin.password=powerjob_admin

4.启动
启动tech.powerjob.server.PowerJobServerApplication,访问http://127.0.0.1:7700/

六 功能介绍
1. 系统首页

2.新建任务

任务名称:名称,便于记忆与搜索,无特殊用途,请尽量简短(占用数据库字段空间)
任务描述:描述,无特殊作用,请尽量简短(占用数据库字段空间)
任务参数:任务处理时能够获取到的参数(即各个Processor 的 process 方法入参 TaskContext 对象的 jobParams 属性)(进行一次处理器开发就能理解了)
定时信息:该任务的触发方式,由下拉框和输入框组成
oAPI -> 不需要填写任何参数,表明该任务由 OpenAPI 触发
oCRON -> 填写 CRON 表达式(在线生成网站https://cron.qqe2.com/)
o固定频率 -> 填写整数,单位毫秒
o固定延迟 -> 填写整数,单位毫秒
o每日固定间隔 -> 哪几天的哪些时间段需要执行,比如每周二和三的10点到11点间每10分钟触发一次
o工作流 -> 不需要填写任何参数,表明该任务由工作流(workflow)触发
生命周期:定时策略生效的时间段
执行配置:由执行类型(单机、广播和 MapReduce )、处理器类型和处理器参数组成,后两项相互关联。
o内置Java处理器
方式一 -> 填写该处理器的全限定类名(eg, tech.powerjob.samples.processors.MapReduceProcessorDemo)
方式二 -> 填写 IOC 容器的 bean 名称,比如 Spring 用户可填写 Spring Bean 名称(eg, 处理器使用注解 @Component(value = "powerJobProcessor"),则控制台可填写 powerJobProcessor)
方式三 -> 方法级注解,非 MapReduce 任务可直接使用注解 @PowerJobHandler 将某个方法转化为 PowerJob 任务,并设置唯一入参 TaskContext注入上下文,具体参考代码
@Component(value = "springMethodProcessorService")
public class SpringMethodProcessorService {

/*** 处理器配置方法1: 全限定类名#方法名,比如 tech.powerjob.samples.tester.SpringMethodProcessorService#testEmptyReturn* 处理器配置方法2: SpringBean名称#方法名,比如 springMethodProcessorService#testEmptyReturn* @param context 必须要有入参 TaskContext,返回值可以是 null,也可以是其他任意类型。正常返回代表成功,抛出异常代表执行失败*/
@PowerJobHandler(name = "testEmptyReturn")
public String testEmptyReturn(TaskContext context) {OmsLogger omsLogger = context.getOmsLogger();omsLogger.warn("测试日志");return "响应结果,正常返回视为执行成功,抛出异常视为执行失败"
}

}
oJava容器 -> 填写容器ID#处理器全限定类名(eg,1#cn.edu.zju.oms.container.ContainerMRProcessor)
oSHELL、Python、SQL 、HTTP 等任务的执行:点击查看官方处理器的使用教程
运行配置
o派发策略:默认健康度优先,优先选择性能最优机器进行执行,可选随机均摊等其他派发模式
HEALTH_FIRST:默认策略,健康度优先,会选择 worker 集群中状态最好的机器作为本次任务的主节点
RANDOM:随机
SPECIFY:指定主节点运行,常用于 Map/MapReduce 等场景,大规模计算时,主节点部署/重启会导致任务完全失败,因此可为主节点搭建一个隔离环境,通过该参数指定主节点到该隔离环境运行,使其摆脱普通 worker 节点部署带来的影响。指定语法等同于“执行机器地址”的语法,填写 IP 或者 TAG。
o最大实例数:该任务允许同时执行的数量,0代表不限(默认为 0)
o单机线程并发数:该实例执行过程中每个 Worker 使用的线程数量(MapReduce 任务生效,其余无论填什么,都只会使用必要的线程数...)
o运行时间限制:限定任务的最大运行时间,超时则视为失败,单位毫秒,0 代表不限制超时时间(不建议不限制超时时间)。
重试配置:
oInstance 重试次数:实例级别,失败了整个任务实例重试,会更换 TaskTracker(本次任务实例的Master节点),代价较大,大型Map/MapReduce慎用。
oTask 重试次数:Task 级别,每个子 Task 失败后单独重试,会更换 ProcessorTracker(本次任务实际执行的 Worker 节点),代价较小,推荐使用。
o注:请注意同时配置任务重试次数和子任务重试次数之后的重试放大,比如对于单机任务来说,假如任务重试次数和子任务重试次数都配置了 1 且都执行失败,实际执行次数会变成 4 次!推荐任务实例重试配置为 0 ,子任务重试次数根据实际情况配置。
机器配置:用来标明允许执行任务的机器状态,避开那些摇摇欲坠的机器,0 代表无任何限制。
o最低 CPU 核心数:填写浮点数,CPU 可用核心数小于该值的 Worker 将不会执行该任务。
o最低内存(GB):填写浮点数,可用内存小于该值的 Worker 将不会执行该任务。
o最低磁盘(GB):填写浮点数,可用磁盘空间小于该值的 Worker 将不会执行该任务。
集群配置
o执行机器地址,指定集群中的某几台机器执行任务
IP模式:多值英文逗号分割,如192.168.1.1:27777,192.168.1.2:27777。常用于 debug 等场景,需要指定特定机器运行。
TAG 模式:通过 PowerJobWorkerConfig#tag将执行器打标分组后,可在控制台通过 tag 指定某一批机器执行。常用于分环境分单元执行的场景。如某些任务需要屏蔽安全生产环境(tag 设置为环境标),某些任务只需要在特定单元执行(tag 设置单元标)
o最大执行机器数量:限定调动执行的机器数量
报警配置:
o选择任务执行失败后报警通知的对象,需要事先录入。
o对于秒级任务,支持分别指定错误阈值(C)、统计窗口(S)、沉默窗口(W) 来定制更加丰富的告警机制(表示的是在 S 秒内失败次数达到 C 次就触发告警,并且沉默 W 秒)
日志配置:可使用控制台配置调整 Job 使用的 Logger 及 LogLevel
o支持 SERVER(服务端日志,默认)、LOCAL(本地日志)、STDOUT(系统输出)、NULL(空实现)4种 LogType
o支持 DEBUG、INFO、WARN、ERROR、OFF 5种级别控制
o使用建议:初期调试可使用 SERVER 日志,后续功能稳定后改为 LOCAL,并调高日志级别,降低通讯压力,消除性能瓶颈问题
高级设置
oTaskTracker 行为:
NORMAL:常规行为。不特殊处理,TaskTracker 正常参与集群计算,会导致其负载比常规节点高。适用于节点数不那么多,任务不那么繁重的场景。
PADDLING:划水:只负责管理节点,不参与计算,稳定性最优。适用于节点数量非常多的大规模计算场景,少一个计算节点来换取稳定性提升。

3. 任务运行&运维
支持运行任务时手动指定实例参数


七 正式部署
1. 调度中心(powerjob-server)
1.1配置讲解

1.2部署
基础版流程:
1.下载镜像:docker pull powerjob/powerjob-server:latest

2.创建容器并运行(仅需修-e PARAMS,即传入 SpringBoot 相关配置信息),示例如下
docker run -d
--restart=always
--name powerjob-server
-p 7700:7700 -p 10086:10086 -p 10010:10010
-e TZ="Asia/Shanghai"
-e JVMOPTIONS=""
-e PARAMS="--spring.profiles.active=product --spring.datasource.core.jdbc-url=jdbc:mysql://192.168.1.1:3306/powerjob-product?useUnicode=true&characterEncoding=UTF-8 --spring.datasource.core.username=root --spring.datasource.core.password=root --spring.data.mongodb.uri=mongodb://192.168.1.1:27017/powerjob-product"
-v ~/docker/powerjob-server:/root/powerjob/server -v ~/.m2:/root/.m2
powerjob/powerjob-server:latest

2 执行器(powerjob-worker)初始化

2.1引入依赖

tech.powerjob
powerjob-worker-spring-boot-starter
${latest.powerjob.version}

2.2添加相关的配置项

akka 工作端口,可选,默认 27777

powerjob.worker.akka-port=27777

接入应用名称,用于分组隔离,推荐填写 本 Java 项目名称

powerjob.worker.app-name=my-powerjob-worker

调度服务器地址,IP:Port 或 域名,多值逗号分隔

powerjob.worker.server-address=127.0.0.1:7700,127.0.0.1:7701

通讯协议,4.3.0 开始支持 HTTP 和 AKKA 两种协议,官方推荐使用 HTTP 协议(注意 server 和 worker 都要开放相应端口)

powerjob.worker.protocol=http

持久化方式,可选,默认 disk

powerjob.worker.store-strategy=disk

任务返回结果信息的最大长度,超过这个长度的信息会被截断,默认值 8192

powerjob.worker.max-result-length=4096

单个任务追加的工作流上下文最大长度,超过这个长度的会被直接丢弃,默认值 8192

powerjob.worker.max-appended-wf-context-length=4096

同时运行的轻量级任务数量上限

powerjob.worker.max-lightweight-task-num=1024

同时运行的重量级任务数量上限

powerjob.worker.max-heavy-task-num=64

3 处理器(Processor)开发

3.1基本概念
Java 处理器可根据代码所处位置划分为内置 Java 处理器和外置 Java 处理器,前者直接集成在宿主应用(也就是接入本系统的业务应用)中,一般用来处理业务需求;后者可以在一个独立的轻量级的 Java 工程中开发,通过 JVM 容器技术被 worker 集群热加载,提供 Java 的“脚本能力”,一般用于处理灵活多变的需求。
Java 处理器可根据对象创建者划分为 SpringBean 处理器和普通 Java 对象处理器,前者由 Spring IOC 容器完成处理器的创建和初始化,后者则由 PowerJob 维护其生命周期。如果宿主应用支持 Spring,强烈建议使用 SpringBean 处理器,开发者仅需要将 Processor 注册进 Spring IOC 容器(一个 @Component 注解或一句 bean 配置)即可享受 Spring 带来的便捷之处。
Java处理器可根据功能划分为单机处理器、广播处理器、Map 处理器和 MapReduce 处理器。
o单机处理器(BasicProcessor)对应了单机任务,即某个任务的某次运行只会有某一台机器的某一个线程参与运算。
o广播处理器(BroadcastProcessor)对应了广播任务,即某个任务的某次运行会调动集群内所有机器参与运算。
oMap处理器(MapProcessor)对应了Map任务,即某个任务在运行过程中,允许产生子任务并分发到其他机器进行运算。
oMapReduce 处理器(MapReduceProcessor)对应了 MapReduce 任务,在 Map 任务的基础上,增加了所有任务结束后的汇总统计。

3.2 工作流上下文( WorkflowContext )
该属性是 v4.0.0 版本的重大变更之一,移除了原来的参数传递机制,提供了 API 让开发者可以更加灵活便捷地在工作流中实现信息的传递。

上游任务通过 WorkflowContext#appendData2WfContext(String key,Object value)  方法向工作流上下文中追加数据,下游任务便可以通过 WorkflowContext#fetchWorkflowContext()  方法获取到相应的数据进行消费。注意,当追加的上下文信息的 key 已经存在于当前的上下文中时,新的 value 会覆盖之前的值。另外,每次任务实例追加的上下文数据大小也会受到 worker 的配置项 powerjob.worker.max-appended-wf-context-length 的限制,超过这个长度的会被直接丢弃。

3.3 处理器开发示例
3.3.1单机处理器:BasicProcessor
单机执行的策略下,server 会在所有可用 worker 中选取健康度最佳的机器进行执行。单机执行任务需要实现接口 BasicProcessor,代码示例如下:

3.3.2 广播处理器:BroadcastProcessor
广播执行的策略下,所有机器都会被调度执行该任务。为了便于资源的准备和释放,广播处理器在BasicProcessor 的基础上额外增加了 preProcess 和 postProcess 方法,分别在整个集群开始之前/结束之后选一台机器执行相关方法。代码示例如下:

3.3.3 并行处理器:MapReduceProcessor
MapReduce 是最复杂也是最强大的一种执行器,它允许开发者完成任务的拆分,将子任务派发到集群中其他Worker 执行,是执行大批量处理任务的不二之选!实现 MapReduce 处理器需要继承 MapReduceProcessor类
注:Map 处理器相当于 MapReduce 处理器的阉割版本(阉割了 reduce 方法),此处不再单独举例。
reduce 阶段需要全量读取子任务结果,会对内存一定要求。如果只需要计算,不需要二次统计或自行完成统计,推荐使用 Map 处理器即可,对执行节点性能更友好。

3.3.4 最佳实践:MapReduce 实现静态分片

3.3.5 最佳实践:MapReduce 多级分发处理
利用 MapReduce 实现 Root -> A -> B/C -> Reduce)的 DAG 工作流

3.4 用户(账号体系)配置
3.4.1 用户体系说明
5.x版本(beta版)
PowerJob 用户体系的设计理念,是提供足够的开放性,让开发者能轻松接入企业已有的统一登录体系,降低使用成本,避免 N 个中间件 N 套账号密码的窘境。同时也对有安全性诉求的使用方更友好,可使用自己信赖的登录体系。
在此设计理念驱动下,PowerJob 主框架没有耦合任何一种登录方式,PowerJob 真正的 User 完全依赖第三方账号体系的同步。
举个例子,用户通过钉钉登录后,PowerJob 会感知到此回调,同步创建 accountType=DING,username=用户在钉钉内的唯一ID 的 PowerJob 账户。

3.4.2支持的账号体系
3.4.2.1 PowerJob 账号体系
简单的注册、登录功能
3.4.2.2 钉钉账号体系
3.4.2.3 自定义账号体系
仅需要开发者自行在 powerjob-server 实现 tech.powerjob.server.auth.login.ThirdPartyLoginService接口,适配企业自己的账号体系,即可接入

3.5 报警配置(邮件、WebHook、钉钉、自定义)
3.5.1 邮件报警
在 powerjob-server 中初始化邮件配置

在控制台登陆界面(点击右上角 Setting - 退出 即可前往控制台登陆界面)点击报警用户录入,录入报警用户信息

在任务中配置报警接收人

3.5.2 WebHook
powerjob-server 会向目标 URL 发送包含报警任务内容的 POST 请求,推荐使用 Map<String, Object> 接收并处理:

3.5.3 钉钉报警
STEP1: 创建企业内部应用:PowerJob
1.钉钉开放平台
https://open-dev.dingtalk.com/#/corpeapp
2.官方文档引入示例
https://www.yuque.com/powerjob/guidence/ydg1kv#dB1G2
STEP2: 配置报警信息
1.在控制台登陆界面(点击右上角 Setting - 退出 即可前往控制台登陆界面)点击 报警用户录入
2.录入报警用户信息,其中 手机号需要关联企业钉钉账户。
3.在任务中配置报警接收人。
3.5.4 自定义报警功能
实现 tech.powerjob.server.extension.Alarmable 接口,并将实现类对象的创建交由 Spring 管理(添加 @Service 或 @Component 注解)

八 高级特性
1 容器
1.1 介绍
PowerJob 的容器技术允许开发者开发独立于 Worker 项目之外 Java 处理器,简单来说,就是以 Maven 工程项目的维度去组织一堆 Java 文件(开发者开发的众多脚本处理器),进而兼具开发效率和可维护性。
该容器为 JVM 级容器,而不是操作系统级容器(Docker)

1.2 用途举例
比如,突然出现了某个数据库数据清理任务,与主业务无关,写进原本的项目工程中不太优雅,这时候就可以单独创建一个用于数据操作的容器,在里面完成处理器的开发,通过 PowerJob 的容器部署技术在 Worker 集群上被加载执行。
比如,常见的日志清理啊,机器状态上报啊,对于广大 Java 程序员来说,也许并不是很会写 shell 脚本,此时也可以借用 agent+容器 技术,利用 Java 完成各项原本需要通过脚本进行的操作。

1.3生成容器模版
为了方便开发者使用,最新版本的前端页面已经支持容器工程模版的自动生成,开发者仅需要填入相关信息即可下载容器模版开始开发。

Group:对应 Maven 的 标签,一般填入倒写的公司域名。
Artifact:对应 Maven 的 标签,填入代表该容器的唯一标示。
Name:对应 Maven 的 标签,填入该容器名称。
Package Name:包名,代表了的容器工程内部所使用的包名,警告:包名一旦生成后,请勿更改!否则会导致运行时容器加载错误(当然,如有必须修改包名的需求,可以尝试替换 /resource 下以 oms-worker-container 开头的所有文件相关的值)。
Java Version:容器工程的 Java 版本,请务必与容器目标部署 Worker 平台的 Java 版本保持一致。
注:开发容器工程前,请检查 pom.xml 文件,确保 powerjob.worker.version 与实际部署应用的 worker 版本一致!

1.4 创建容器

目前,PowerJob 支持使用 Git 代码库和 FatJar 来创建容器。创建路径:容器运维 -> 容器管理 -> 新建容器。
当使用 Git 代码库创建容器时,powerjob-server 需要完成代码库的下载、编译、构建和上传,因此需要其运行环境包含可用的 Git 和 Maven 环境(包括私服的访问权限)。
下图为使用Git代码库创建容器的示例,需要填入容器名称和代码库信息等参数:

下图为使用 FatJar 创建容器的示例,需要上传可用的 FatJar(注:FatJar 内必须包含了所有依赖的 Jar 文件)

1.5 部署容器
每次创建和修改容器后,都需要进行一次部署(相当于发布该容器的最新版本),使 powerjob-worker 动态加载容器内的 Processor。点击部署,可以看到详细的部署信息。

2 OpenAPI
2.1 简介
OpenAPI 允许开发者通过接口来完成手工的操作,让系统整体变得更加灵活。开发者可以基于 API 便捷地扩展PowerJob 原有的功能,比如,全面定制自己的任务调度策略。
可以让接入方自己实现 PowerJob 的整个任务管理与调度模块。
2.2依赖

tech.powerjob
powerjob-client
${latest.powerjob.version}

2.3简单示例:
通过 OpenAPI 停止某个任务实例。
// 初始化 client,需要server地址和应用名称作为参数
PowerJobClient client = new PowerJobClient("127.0.0.1:7700", "oms-test", "password");
// 调用相关的API
client.stopInstance(1586855173043L)
2.4 API列表
更多api查看官方在线文档
https://www.yuque.com/powerjob/guidence/olgyf0

3 工作流(workflow)
工作流描述了任务与任务之间的依赖关系,比如我现在有 A、B、C、D 四个任务,我希望 A 任务运行完毕后才开始运行 B、C 任务,最后再运行 D 任务。这就形成了一个依赖关系,可以通过有向无环图(DAG)来描述这个关系,如下图所示。

1 如何使用工作流?
STEP1:录入任务
配置依赖关系的前提是要有任务可以配置,所以首先需要前往任务管理界面,录入相关的任务。

STEP2:配置工作流
点击右上角按钮 新建工作流,即可录入新的工作流,具体界面和说明如下所示。
工作流名称:名称,无实际业务用途,请尽量精简字段
工作流描述:描述,无实际业务用途,请尽量精简字段
定时信息:该工作流的触发方式的触发方式,包含时间表达式类型选择框和时间表达式输入框
oCRON -> 填写 CRON 表达式
oAPI -> 不需要填写任何参数,表明该任务由 OpenAPI 触发
生命周期:定时策略生效的时间段
最大实例:该工作流同时执行的数量
任务依赖关系:提供编辑界面可视化操作,绘制 DAG(有向无环图),配置工作流内各个任务的依赖关系

2 特殊节点说明
2.1 判断节点
判断节点 不允许失败跳过以及禁用,节点参数中存储的是 Groovy 代码(执行 Groovy 代码时会将当前工作流上下文作为 context 变量注入到代码执行的上下文中),其执行结果仅能返回 "true" 或者 "false",同时判断节点仅有且必须有两条“输出”路径。会根据该代码的执行结果决定下游需要执行的节点。
备注:如果需要根据上游节点的执行结果决定下游节点,可以将上游节点的执行结果注入上下文中,再在判断节点中做相应的判断。

2.2 工作流嵌套节点

该节点代表对某个工作流的引用,节点的 jobId 属性存储的是工作流 id,其他属性和普通的任务节点一致。不允许出现循环引用以及多级嵌套的情况,即嵌套节点中指向的工作流一定是一个不含嵌套节点的工作流。
执行到该节点时,如果该节点处于启用状态,那么将启动该节点所引用工作流的一个新实例,待该实例执行完成后再同步更新该节点的状态。
注意,创建子工作流时,会透传当前的上下文作为工作流的实例参数,在子工作流执行完成时会合并子工作流的上下文至父工作流的上下文中。
重试子工作流不会联动重试父工作流,但失败的子工作流会随着父工作流的重试而原地重试(不会生成新的实例)

2.3 一键复制工作流功能

2.4 标记成功重试
DAG 界面左侧上方还有个相对比较隐蔽的功能按钮 📌 标记成功,如下图所示

选中执行失败的节点,点击标记成功后会将其 DAG 中的节点状态置为成功(不会更改对应任务实例的状态,所以点击节点看到的任务实例详情还是失败)

该功能主要用于搭配工作流实例的重试功能实现灵活运维 (想跳过某个失败的节点进行重试)。

4 官方处理器
对于一些通用的任务,PowerJob 官方编写了可开箱即用的 Processor 来方便各位使用!只需要引入以下依赖即可享受所有现成的强大的官方处理器!

tech.powerjob
powerjob-official-processors
${latest.version}

每个官方处理器的详细使用方法请仔细阅读文档,有任何疑惑建议直接阅读源码!
由于 JSON 内传递许多参数涉及到转义,建议先用 Java 代码生成配置(JSONObject#put),再调用 toJSONString 方法生成参数。
4.1 Shell 处理器
全限定类名 tech.powerjob.official.processors.impl.script.ShellProcessor

4.2 Python 处理器
全限定类名 tech.powerjob.official.processors.impl.script.PythonProcessor
注意:Python 处理器会使用机器的 python 命令执行,因此 python 版本需要与本机 python 环境保持一致!

4.3 HTTP 处理器
全限定类名 tech.powerjob.official.processors.impl.HttpProcessor
method【必填字段】:GET / POST / DELETE / PUT
url【必填字段】:请求地址
timeout【可选字段】:超时时间,单位为秒
mediaType【可选字段】:使用非 GET 请求时,需要传递的数据类型,如 application/json
body【可选字段】:使用非 GET 请求时的 body 内容,后端使用 String 接收,如果为 JSON 请注意转义
headers【可选字段】:请求头,后端使用 Map<String, String> 接收

4.4 文件清理处理器
注意:文件删除是高危操作,请慎用该处理器。默认情况下该处理器不可用,需要传入 JVM 参数
-Dpowerjob.official-processor.file-cleanup.enable=true 开启
全限定类名 tech.powerjob.official.processors.impl.FileCleanupProcessor

整体参数为 array,array 中的每个元素为 JSON,描述需要清理的资源,每个节点的参数如下:
dirPath:待删除文件的文件夹目录(会递归查找该目录下所有符合要求的文件)
filePattern:待删除文件名称的 Java 版正则表达式
retentionTime:待删除文件的保留时间,单位为小时(当前时间 - 待删除文件上次编辑时间 > retentionTime 的文件才会被删除),用于保留某些滚动日志,0 代表忽略该规则
1.脚本清理配置

2.日志清理配置

4.5 SQL 处理器
目前内置了两款 SQL 处理器,均支持自定义 SQL 的校验、解析逻辑,主要区别在于数据源连接的获取方式不同。
任务参数(JSON)
dataSourceName:数据源名称,仅对 SpringDatasourceSqlProcesssor 生效,非必填,默认使用 default 数据源
sql:需要执行的 SQL 语句,必填
timeout:SQL 超时时间(秒),非必填,默认值 60
jdbcUrl:jdbc 数据库连接,仅对 DynamicDatasourceSqlProcessor 生效,必填
showResult:布尔值,是否在实例日志中展示 SQL 执行结果,非必填,默认值 false
建议生产环境使用 AbstractSqlProcessor#registerSqlValidator 方法至少注册一个 SQL 校验器拦截掉非法 SQL,比如 truncate、drop 此类危险操作,或者在数据库账号的权限上做管控。如果需要自定义 SQL 解析逻辑,比如 宏变量替换,参数替换 等,则可以通过指定 AbstractSqlProcessor.SqlParser 来实现。

4.5.1 SpringDatasourceSqlProcessor
全限定类名 tech.powerjob.official.processors.impl.sql.SpringDatasourceSqlProcessor

默认情况下在初始化的时候需要至少注入一个数据源,所以必须提前手动初始化并注册到 Spring IOC 容器中,以 SpringBean 的方式进行加载。

允许使用 SpringDatasourceSqlProcessor#registerDataSource 方法注册多个数据源

建议:最好将该 SQL Processor 用的数据库连接池和其他业务模块用的数据库连接池隔离开,不要共用一个连接池!

初始化 SpringDatasourceSqlProcessor 示例代码

参数配置示例
使用默认数据源,执行 SQL : select 'x' from t_example ,限定超时时间为 10 秒,并且在实例日志中展示结果

{
"dataSourceName": "default",
"sql": "select 'x' from t_example",
"timeout": 10,
"showResult": true
}

4.5.2 DynamicDatasourceSqlProcessor
默认情况下该处理器不可用,需要传入 JVM 参数 -Dpowerjob.official-processor.dynamic-datasource.enable=true 开启

全限定类名 tech.powerjob.official.processors.impl.sql.DynamicDatasourceSqlProcessor

支持通过参数动态指定数据源连接,在指定的数据库执行 SQL。

参数配置示例
{
"sql": "select 'x' from t_example",
"timeout": 10,
"showResult": true,
"jdbcUrl":"jdbc:mysql://myhost1:3306/db_name?user=root&password=mypass",
}

4.6 工作流上下文注入处理器
全限定类名 tech.powerjob.official.processors.impl.context.InjectWorkflowContextProcessor ( since v1.2.0 )
该处理器会从任务参数中加载数据,尝试将其解析成 Map ,如果解析成功,则会将其注入到工作流上下文中。
注意,参数必须是一个HashMap<String, Object>的 JSON 串形式,否则会解析失败。
注意:该 Processor 主要用于一些需要注入固定上下文的工作流场景,作为单个任务执行是没有任何意义的
参数配置示例
{
"uid": "powerjob_001",
"createTime": 1662210950000,
}

4.7 动态配置处理器
简易版配置中心,适用于没有 nacos 等配置中心时临时需要动态下发配置的场景。
该处理器原理非常简单,将控制台的 jobParams 写入类本身的 static 变量中,这样开发者可直接通过静态方法获取到 Job 的配置(ConfigProcessor#fetchConfig),起到伪配置中心的效果。
全限定类名:tech.powerjob.official.processors.impl.ConfigProcessor
执行模式:广播(需要保证每台机器都下发配置,必须选广播执行)
参数说明:
public static class Config implements Serializable {
/**
* 原始配置,通过 ConfigProcessor#fetchConfig 真正能获取到的部分
/
private Map<String, Object> config;
/
*
* 持久到本地的全路径名称,空代表不持久化(不需要则留空即可)
*/
private String persistentFileName;
}
最简参数示例:
{
"config":{
"keyA":"valueA",
"keyB":"valueB",
"tips":"config内可放置任意层级的 JSON 数据,完全透传"
}
}
以下为控制台完整配置,再次强调
执行模式必须选择为:广播执行!!!
定时信息:建议选择1分钟CRON(参数1分钟更新一次)表达式:0 * * * * ?

使用代码示例(非常简单,通过 ConfigProcessor.fetchConfig()获取到 config):
// 测试配置中心获取数据
Map<String, Object> dynamicConfig = ConfigProcessor.fetchConfig();
Object valueA = dynamicConfig.get("keyA");
logger.info("[Test] dynamicConfig: {}, fetchByKeyA: {}", dynamicConfig, valueA);

九 相关文章
开源大佬发布的相关文章解释
1 PowerJob 技术综述
https://mp.weixin.qq.com/s?__biz=MzA5MzYyNzQ0MQ&mid=2247487354&idx=1&sn=dafe57e786cc03b157bea6cdf5433623&chksm=905bba24a72c33324599e817e4d959612d33d60ba4b5a6d24c7b451b1e8f48df5f8217a7b6aa&scene=178&cur_album_id=1424305043590791168#rd
2 serve与worker的通信(Akka Toolkit组件)
https://mp.weixin.qq.com/s?__biz=MzA5MzYyNzQ0MQ
&mid=2247487785&idx=1&sn=a4136f60de33ba5d958f1acf7a17e39f&chksm=905ba477a72c2d61b22dbf412c4bb6497056d408588d15186b73727e3d20ef88348b41cd13f3&scene=178&cur_album_id=1424305043590791168#rd
3 PowerJob 超强大的调度层(时间轮、可靠调度——WAL)
https://mp.weixin.qq.com/s?__biz=MzA5MzYyNzQ0MQ&mid=2247487928&idx=1&sn=ebeef9c57202a19c4267fe50273ce2e9&chksm=905ba4e6a72c2df050c97f72bc0291c5fbb4d07ff78103323872c83ec147d887e798678ad152&scene=178&cur_album_id=1424305043590791168#rd
4 PowerJob 的自实现高可用方案(分组隔离)
https://mp.weixin.qq.com/s?__biz=MzA5MzYyNzQ0MQ
&mid=2247488202&idx=1&sn=129f0e65ee1d5104e2db391e4ef67e50&chksm=905ba794a72c2e824b9ebc642eb55af1ec6215144f0f660c2a6163c3aafe10d606835edacbe3&scene=178&cur_album_id=1424305043590791168#rd
PowerJob源码分析-分组隔离设计: https://zhuanlan.zhihu.com/p/163487886
PowerJob源码解读1:Server和Worker之间的通信解读: https://juejin.im/post/6854573208524800008
5 PowerJob 在线日志(H2数据库)
https://mp.weixin.qq.com/s?__biz=MzA5MzYyNzQ0MQ&mid=2247488651&idx=1&sn=769bea7d6f71eab1568e24855cdfa9e0&chksm=905ba1d5a72c28c3867cb0c6438b64844b07a692d9260cfb384e11cf2e837225bdb144245053&scene=178&cur_album_id=1424305043590791168#rd
6 PowerJob 的序列化方案(kryo,线程安全ThreadLocal)
https://mp.weixin.qq.com/s?__biz=MzA5MzYyNzQ0MQ
&mid=2247489138&idx=1&sn=494a4d30e54f1a2c528055972286e3e3&chksm=905ba32ca72c2a3a65c9ce826a2a38f1e3166249f0c9c67cab80687da6e11bdb93f830ad6597&scene=178&cur_album_id=1424305043590791168#rd
7 PowerJob 应对庞大任务的锦囊妙计:MapReduce
https://mp.weixin.qq.com/s?__biz=MzA5MzYyNzQ0MQ==&mid=2247489177&idx=1&sn=63857e611a3ce91bcfd1e26cc53826d6&chksm=905ba3c7a72c2ad150cf886beb865b9ea383a36d37329d1b2160f4de9cea775f231a9934b370&scene=178&cur_album_id=1424305043590791168#rd

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/908702.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

角球预测方法:基于模糊区域划分与可解释增强模型的概率估计

​引言 在现代竞技运动中,量化评估运动员表现一直是数据分析领域的核心挑战。由于得分事件相对稀少,传统基于简单计数的统计方法往往无法准确反映运动员对比赛结果的实际贡献。近年来,随着事件数据采集技术的进步,基于期望值的表现评估方法逐渐成为研究热点,其中角球情境下…

算法备案能加急办理吗?

首先明确一点,算法备案官方审核部门是没有所谓的渠道提供快速审核服务的,所有申请者都是一视同仁。其次,我们确实有办法提高审核效率,加快算法备案进度。大家要注意识别所谓的加速办理是哪种情况以下是一些算法备案审核周期的重要信息,供大家参考:一、算法备案官方周期 1…

Windows11+OBS+视频号+麦克风设置直播操作流程

OBS+视频号直播操作流程 一、前期准备 1、可用于直播的电脑,我的是Win11系统 2、硬件设备(相机、采集卡、麦克风等) 3、软件(微信、OBS) 4、虚拟声卡 注:这个教程主要说一下声卡的配置,所以相机和采集卡之类的没有讲到 二、软件安装 微信和OBS这个都不会安装就别折腾了,所…

2025年天梯赛补题记录——整数的持续性

为什么没写出来:哈哈,看到400ms就不想写了,被前面一个题目卡了两次时间心态崩了,头脑发昏以为直接算过去会超时(能说那个时候快困死了脑袋很不灵光吗,给自己的无能找借口嘻嘻) 优化思路: 1.记忆化缓存:一想便知道每个数的分解都算一次很费时间,可以联想到记忆化缓存—…

《上古卷轴3:晨风》——存档技能数据修改

《上古卷轴3:晨风》由于其mod广泛开发,使得游戏的生命力非常强大,至今仍受广大RPG迷的喜爱!但晨风的技能数据如果用CE去修改,则是无用的。这里提供了技能数据的存档顺序,因此可以利用hex editor类的软件直接修改存档CE修改失效 《上古卷轴3:晨风》由于其mod广泛开发,使…

记录一次Armbian安装宝塔面板遇到ModuleNotFoundError: No module named _sqlite3的问题

如果在用Armbian安装宝塔面板的时候遇到ModuleNotFoundError: No module named _sqlite3报错,并且无法进入web面板界面,可以尝试以下操作。报错界面展示:步骤1:更换或添加Ubuntu软件源地址到/etc/apt/source.list.d文件夹的文件中 例如:将下面的地址添加到/etc/apt/source…

Cesium中glb模型颜色暗淡解决

问题: 3dmax导出fbx,此fbx文件导入blender中,再由blender导出成glb模型,该glb模型放入cesium中贴图颜色颜色暗沉无光,试了各种办法(泛光、时差、多光源、唯一光)效果均不明显。 原因: 发现,转格式过程中不知道哪一环出错,会导致模型材质一个叫metallicFactor的属性格…

工业相机与采集卡配套方案:构建高性能机器视觉系统的核心要素

机器视觉技术作为人工智能和智能制造的关键组成部分,正日益广泛地应用于工业自动化、质量检测、机器人引导等领域。而一套高性能的机器视觉系统,离不开工业相机和采集卡的完美配合。工业相机负责图像的采集,采集卡则负责将相机采集到的图像数据传输到计算机进行分析和处理。…

在Linux环境下搭建Dify

在Linux环境下搭建Dify Dify的概述 Dify是一款开源的大语言模型(LLM)应用开发平台。它融合了后端即服务(Backend as Service)和LLMOps的理念,使开发者可以快速搭建生成级的生成式AI应用。即使你是非技术人员,也能参与到AI应用的定义和数据运营过程中。由于Dify内置了构建LLM应…

Linux密钥认证及Windows使用密钥连接Linux

概述 Linux中我们要连接主机,输入用户名密码然后连接,我们发现每次连接都要输入密码,对于一些批量操作不方便 我们需要一种新的认证方式,每次连接不需要输入密码,这个方法就叫密钥认证 密钥认证原理原理详解:使用ssh-keygen命令生成私钥和公钥。 使用ssh-copy-id命令将公…