Spring Batch 批处理框架

一、SpringBatch 介绍

Spring Batch 是一个轻量级、全面的批处理框架,旨在支持开发对企业系统的日常操作至关重要的健壮的批处理应用程序。Spring Batch 建立在人们期望的 Spring Framework 特性(生产力、基于 POJO 的开发方法和一般易用性)的基础上,同时使开发人员可以在必要时轻松访问和使用更高级的企业服务。

Spring Batch 不是一个调度框架。在商业和开源领域都有许多优秀的企业调度程序(例如 Quartz、Tivoli、Control-M 等)。Spring Batch 旨在与调度程序结合使用,而不是替代调度程序。
在这里插入图片描述

二、业务场景

我们在业务开发中经常遇到这种情况:
在这里插入图片描述

Spring Batch 支持以下业务场景:

  • 定期提交批处理。
  • 并发批处理:并行处理作业。
  • 分阶段的企业消息驱动处理。
  • 大规模并行批处理。
  • 失败后手动或计划重启。
  • 相关步骤的顺序处理(扩展到工作流驱动的批次)。
  • 部分处理:跳过记录(例如,在回滚时)。
  • 整批交易,适用于批量较小或已有存储过程或脚本的情况。

三、基础知识

3.1、整体架构

官方文档:https://docs.spring.io/spring-batch/docs/current/reference/html/index-single.html#domainLanguageOfBatch
在这里插入图片描述

名称作用
JobRepository为所有的原型(Job、JobInstance、Step)提供持久化的机制
JobLauncherJobLauncher表示一个简单的接口,用于启动一个Job给定的集合 JobParameters
JobJob是封装了整个批处理过程的实体
StepStep是一个域对象,它封装了批处理作业的一个独立的顺序阶段

3.2、核心接口

  • ItemReader: is an abstraction that represents the output of a Step,
    one batch or chunk of items at a time
  • ItemProcessor:an abstraction that represents the business processing
    of an item.
  • ItemWriter: is an abstraction that represents the output of a Step,
    one batch or chunk of items at a time.
    在这里插入图片描述
    大体即为 输入→数据加工→输出 ,一个Job定义多个Step及处理流程,一个Step通常涵盖ItemReader、ItemProcessor、ItemWriter

四、基础实操

4.0、引入 SpringBatch

pom 文件引入 springboot

<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.2.5.RELEASE</version><relativePath/> <!-- lookup parent from repository -->
</parent>

pom 文件引入 spring-batch 及相关依赖

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-batch</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-validation</artifactId></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-jdbc</artifactId></dependency></dependencies>

mysql 创建依赖的库表
在这里插入图片描述
sql 脚本的 jar 包路径:…\maven\repository\org\springframework\batch\spring-batch-core\4.2.1.RELEASE\spring-batch-core-4.2.1.RELEASE.jar!\org\springframework\batch\core\schema-mysql.sql

启动类标志@EnableBatchProcessing

@SpringBootApplication
@EnableBatchProcessing
public class SpringBatchStartApplication
{public static void main(String[] args) {SpringApplication.run(SpringBatchStartApplication.class, args);}
}

FirstJobDemo

@Component
public class FirstJobDemo {@Autowiredprivate JobBuilderFactory jobBuilderFactory;@Autowiredprivate StepBuilderFactory stepBuilderFactory;@Beanpublic Job firstJob() {return jobBuilderFactory.get("firstJob").start(step()).build();}private Step step() {return stepBuilderFactory.get("step").tasklet((contribution, chunkContext) -> {System.out.println("执行步骤....");return RepeatStatus.FINISHED;}).build();}
}

4.1、流程控制

A、多步骤任务

@Bean
public Job multiStepJob() {return jobBuilderFactory.get("multiStepJob2").start(step1()).on(ExitStatus.COMPLETED.getExitCode()).to(step2()).from(step2()).on(ExitStatus.COMPLETED.getExitCode()).to(step3()).from(step3()).end().build();
}private Step step1() {return stepBuilderFactory.get("step1").tasklet((stepContribution, chunkContext) -> {System.out.println("执行步骤一操作。。。");return RepeatStatus.FINISHED;}).build();
}private Step step2() {return stepBuilderFactory.get("step2").tasklet((stepContribution, chunkContext) -> {System.out.println("执行步骤二操作。。。");return RepeatStatus.FINISHED;}).build();
}private Step step3() {return stepBuilderFactory.get("step3").tasklet((stepContribution, chunkContext) -> {System.out.println("执行步骤三操作。。。");return RepeatStatus.FINISHED;}).build();
}

B、并行执行
创建了两个 Flow:flow1(包含 step1 和 step2)和 flow2(包含 step3)。然后通过JobBuilderFactory的split方法,指定一个异步执行器,将 flow1 和 flow2 异步执行(也就是并行)

@Component
public class SplitJobDemo {@Autowiredprivate JobBuilderFactory jobBuilderFactory;@Autowiredprivate StepBuilderFactory stepBuilderFactory;@Beanpublic Job splitJob() {return jobBuilderFactory.get("splitJob").start(flow1()).split(new SimpleAsyncTaskExecutor()).add(flow2()).end().build();}private Step step1() {return stepBuilderFactory.get("step1").tasklet((stepContribution, chunkContext) -> {System.out.println("执行步骤一操作。。。");return RepeatStatus.FINISHED;}).build();}private Step step2() {return stepBuilderFactory.get("step2").tasklet((stepContribution, chunkContext) -> {System.out.println("执行步骤二操作。。。");return RepeatStatus.FINISHED;}).build();}private Step step3() {return stepBuilderFactory.get("step3").tasklet((stepContribution, chunkContext) -> {System.out.println("执行步骤三操作。。。");return RepeatStatus.FINISHED;}).build();}private Flow flow1() {return new FlowBuilder<Flow>("flow1").start(step1()).next(step2()).build();}private Flow flow2() {return new FlowBuilder<Flow>("flow2").start(step3()).build();}
}

C、任务决策
决策器的作用就是可以指定程序在不同的情况下运行不同的任务流程,比如今天是周末,则让任务执行 step1 和 step2,如果是工作日,则之心 step1 和 step3。

@Component
public class MyDecider implements JobExecutionDecider {@Overridepublic FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {LocalDate now = LocalDate.now();DayOfWeek dayOfWeek = now.getDayOfWeek();if (dayOfWeek == DayOfWeek.SATURDAY || dayOfWeek == DayOfWeek.SUNDAY) {return new FlowExecutionStatus("weekend");} else {return new FlowExecutionStatus("workingDay");}}
}
@Bean
public Job deciderJob() {return jobBuilderFactory.get("deciderJob").start(step1()).next(myDecider).from(myDecider).on("weekend").to(step2()).from(myDecider).on("workingDay").to(step3()).from(step3()).on("*").to(step4()).end().build();
}
private Step step1() {return stepBuilderFactory.get("step1").tasklet((stepContribution, chunkContext) -> {System.out.println("执行步骤一操作。。。");return RepeatStatus.FINISHED;}).build();
}private Step step2() {return stepBuilderFactory.get("step2").tasklet((stepContribution, chunkContext) -> {System.out.println("执行步骤二操作。。。");return RepeatStatus.FINISHED;}).build();
}private Step step3() {return stepBuilderFactory.get("step3").tasklet((stepContribution, chunkContext) -> {System.out.println("执行步骤三操作。。。");return RepeatStatus.FINISHED;}).build();
}private Step step4() {return stepBuilderFactory.get("step4").tasklet((stepContribution, chunkContext) -> {System.out.println("执行步骤四操作。。。");return RepeatStatus.FINISHED;}).build();
}

D、任务嵌套
任务 Job 除了可以由 Step 或者 Flow 构成外,我们还可以将多个任务 Job 转换为特殊的 Step,然后再赋给另一个任务 Job,这就是任务的嵌套。

@Component
public class NestedJobDemo {@Autowiredprivate JobBuilderFactory jobBuilderFactory;@Autowiredprivate StepBuilderFactory stepBuilderFactory;@Autowiredprivate JobLauncher jobLauncher;@Autowiredprivate JobRepository jobRepository;@Autowiredprivate PlatformTransactionManager platformTransactionManager;// 父任务@Beanpublic Job parentJob() {return jobBuilderFactory.get("parentJob").start(childJobOneStep()).next(childJobTwoStep()).build();}// 将任务转换为特殊的步骤private Step childJobOneStep() {return new JobStepBuilder(new StepBuilder("childJobOneStep")).job(childJobOne()).launcher(jobLauncher).repository(jobRepository).transactionManager(platformTransactionManager).build();}// 将任务转换为特殊的步骤private Step childJobTwoStep() {return new JobStepBuilder(new StepBuilder("childJobTwoStep")).job(childJobTwo()).launcher(jobLauncher).repository(jobRepository).transactionManager(platformTransactionManager).build();}// 子任务一private Job childJobOne() {return jobBuilderFactory.get("childJobOne").start(stepBuilderFactory.get("childJobOneStep").tasklet((stepContribution, chunkContext) -> {System.out.println("子任务一执行步骤。。。");return RepeatStatus.FINISHED;}).build()).build();}// 子任务二private Job childJobTwo() {return jobBuilderFactory.get("childJobTwo").start(stepBuilderFactory.get("childJobTwoStep").tasklet((stepContribution, chunkContext) -> {System.out.println("子任务二执行步骤。。。");return RepeatStatus.FINISHED;}).build()).build();}
}

4.2、读取数据

定义 Model TestData,下面同一

@Data
public class TestData {private int id;private String field1;private String field2;private String field3;
}

读取数据包含:文本数据读取、数据库数据读取、XML 数据读取、JSON 数据读取等,具体自己查资料。

文本数据读取 Demo

@Component
public class FileItemReaderDemo {// 任务创建工厂@Autowiredprivate JobBuilderFactory jobBuilderFactory;// 步骤创建工厂@Autowiredprivate StepBuilderFactory stepBuilderFactory;@Beanpublic Job fileItemReaderJob() {return jobBuilderFactory.get("fileItemReaderJob2").start(step()).build();}private Step step() {return stepBuilderFactory.get("step").<TestData, TestData>chunk(2).reader(fileItemReader()).writer(list -> list.forEach(System.out::println)).build();}private ItemReader<TestData> fileItemReader() {FlatFileItemReader<TestData> reader = new FlatFileItemReader<>();reader.setResource(new ClassPathResource("reader/file")); // 设置文件资源地址reader.setLinesToSkip(1); // 忽略第一行// AbstractLineTokenizer的三个实现类之一,以固定分隔符处理行数据读取,// 使用默认构造器的时候,使用逗号作为分隔符,也可以通过有参构造器来指定分隔符DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();// 设置属性名,类似于表头tokenizer.setNames("id", "field1", "field2", "field3");// 将每行数据转换为TestData对象DefaultLineMapper<TestData> mapper = new DefaultLineMapper<>();// 设置LineTokenizermapper.setLineTokenizer(tokenizer);// 设置映射方式,即读取到的文本怎么转换为对应的POJOmapper.setFieldSetMapper(fieldSet -> {TestData data = new TestData();data.setId(fieldSet.readInt("id"));data.setField1(fieldSet.readString("field1"));data.setField2(fieldSet.readString("field2"));data.setField3(fieldSet.readString("field3"));return data;});reader.setLineMapper(mapper);return reader;}}

4.3、输出数据

输出数据也包含:文本数据读取、数据库数据读取、XML 数据读取、JSON 数据读取等

@Component
public class FileItemWriterDemo {@Autowiredprivate JobBuilderFactory jobBuilderFactory;@Autowiredprivate StepBuilderFactory stepBuilderFactory;@Resource(name = "writerSimpleReader")private ListItemReader<TestData> writerSimpleReader;@Beanpublic Job fileItemWriterJob() throws Exception {return jobBuilderFactory.get("fileItemWriterJob").start(step()).build();}private Step step() throws Exception {return stepBuilderFactory.get("step").<TestData, TestData>chunk(2).reader(writerSimpleReader).writer(fileItemWriter()).build();}private FlatFileItemWriter<TestData> fileItemWriter() throws Exception {FlatFileItemWriter<TestData> writer = new FlatFileItemWriter<>();FileSystemResource file = new FileSystemResource("D:/code/spring-batch-demo/src/main/resources/writer/writer-file");Path path = Paths.get(file.getPath());if (!Files.exists(path)) {Files.createFile(path);}// 设置输出文件路径writer.setResource(file);// 把读到的每个TestData对象转换为JSON字符串LineAggregator<TestData> aggregator = item -> {try {ObjectMapper mapper = new ObjectMapper();return mapper.writeValueAsString(item);} catch (JsonProcessingException e) {e.printStackTrace();}return "";};writer.setLineAggregator(aggregator);writer.afterPropertiesSet();return writer;}}

4.5、处理数据

@Component
public class ValidatingItemProcessorDemo {@Autowiredprivate JobBuilderFactory jobBuilderFactory;@Autowiredprivate StepBuilderFactory stepBuilderFactory;@Resource(name = "processorSimpleReader")private ListItemReader<TestData> processorSimpleReader;@Beanpublic Job validatingItemProcessorJob() throws Exception {return jobBuilderFactory.get("validatingItemProcessorJob3").start(step()).build();}private Step step() throws Exception {return stepBuilderFactory.get("step").<TestData, TestData>chunk(2).reader(processorSimpleReader).processor(beanValidatingItemProcessor()).writer(list -> list.forEach(System.out::println)).build();}//    private ValidatingItemProcessor<TestData> validatingItemProcessor() {
//        ValidatingItemProcessor<TestData> processor = new ValidatingItemProcessor<>();
//        processor.setValidator(value -> {
//            // 对每一条数据进行校验
//            if ("".equals(value.getField3())) {
//                // 如果field3的值为空串,则抛异常
//                throw new ValidationException("field3的值不合法");
//            }
//        });
//        return processor;
//    }private BeanValidatingItemProcessor<TestData> beanValidatingItemProcessor() throws Exception {BeanValidatingItemProcessor<TestData> beanValidatingItemProcessor = new BeanValidatingItemProcessor<>();// 开启过滤,不符合规则的数据被过滤掉;
//        beanValidatingItemProcessor.setFilter(true);beanValidatingItemProcessor.afterPropertiesSet();return beanValidatingItemProcessor;}}

4.6、任务调度

可以配合 quartz 或者 xxljob 实现定时任务执行

@RestController
@RequestMapping("job")
public class JobController {@Autowiredprivate Job job;@Autowiredprivate JobLauncher jobLauncher;@GetMapping("launcher/{message}")public String launcher(@PathVariable String message) throws Exception {JobParameters parameters = new JobParametersBuilder().addString("message", message).toJobParameters();// 将参数传递给任务jobLauncher.run(job, parameters);return "success";}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/8186.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

通信相关知识(三) 接入网

接入网的定界 接入网的功能 用户口功能、业务口功能、核心功能、传送功能、接入网系统管理功能。 ADSL 非对称数字用户线路&#xff08;ADSL&#xff0c;Asymmetric Digital Subscriber Line&#xff09;是数字用户线路&#xff08;xDSL&#xff0c;Digital Subscriber Lin…

结构光三维测量几种比较成熟的方法

1.飞行时间发 原理:通过直接测量光传播的时间,确定物体的面型。发射脉冲信号,接受发射回的光,计算距离。 精度:毫米级 优点:原理简单,可避免阴影和遮挡等问题,且仪器便携化。 缺点:精度相对较低 2.莫尔条纹法 原理:采用两组光栅,一个主光栅,一个基准光栅,通过…

yolov7

一、 ELAN [-1, 1, Conv, [64, 1, 1]],[-2, 1, Conv, [64, 1, 1]],[-1, 1, Conv, [64, 3, 1]],[-1, 1, Conv, [64, 3, 1]],[-1, 1, Conv, [64, 3, 1]],[-1, 1, Conv, [64, 3, 1]],[[-1, -3, -5, -6], 1, Concat, [1]],[-1, 1, Conv, [256, 1, 1]], # 11二、E-ELAN 三、模型缩放…

Docker是什么以及docker的常用命令

Docker简介 Docker是一种开源的容器化平台&#xff0c;用于构建、部署和运行应用程序。它允许开发人员将应用程序及其所有依赖项打包到一个称为"容器"的独立单元中。这个容器可以在任何支持Docker的环境中运行&#xff0c;无论是开发人员的本地机器、虚拟机还是云服务…

配置Jenkins的slave agent并使用它完成构建任务

上一章&#xff0c;使用单机配置并运行了一个简单的maven项目&#xff0c;并发布到了一个服务器上启动。这一章将要配置一个slave agent&#xff0c;并将上一章的job放到agent上执行。我们agent使用的是ssh的方式 前置步骤 准备两台虚拟机&#xff1a; 192.168.233.32&#…

欧科云链2023年报:毛利达1.55亿港元,数字资产业务成最大增长点

据香港商报报道&#xff0c;2023年6月28日&#xff0c;欧科云链控股有限公司&#xff08;以下简称“欧科云链”&#xff09;及其附属公司&#xff08;股份代号&#xff1a;1499.HK&#xff0c;以下简称“集团”&#xff09;发布了截至2023年3月31日的年度报告。报告期内&#x…

Redis【实战篇】---- Redis消息队列

Redis【实战篇】---- Redis消息队列 1. Redis消息队列 - 认识消息队列2. Redis消息队列 - 基于List实现消息队列3. Redis消息队列 - 基于PubSub的消息队列4. Redis消息队列 - 基于Stream的消息队列5. Redis消息队列 - 基于Stream的消息队列-消费组6. 基于Redis的Stream结构作为…

MAC OS X 这个“安装 macOS Xxx Xxx”应用程序副本已损坏,不能用来安装 macOS,超级终端修改日期date 已解决

原因&#xff1a;旧版 macOS 证书已经过期 解决方法&#xff1a;断开互联网&#xff0c;修改系统时间 date 102013142018.20 说明&#xff1a;10是月&#xff0c;20是日&#xff0c;13是时&#xff0c;14是分&#xff0c;2018是年&#xff0c;20是秒 输入上面的代码按回车后…

SpringBoot操作Excel实现导入和导出功能(详细讲解+Gitee源码)

前言&#xff1a;在日常的开发中&#xff0c;避免不了操作Excel&#xff0c;比如从系统当中导出一个报表&#xff0c;或者通过解析客户上传的Excel文件进行批量解析数据入库等等&#xff0c;本篇博客主要汇总日常开发中如何使用开源的Apache提供的POI流操作Excel进行导入导出功…

【035】C++泛型编程(模板)实践:设计数组类模板模仿vector容器

C泛型编程&#xff08;模板&#xff09;实践 引言一、类模板的概述二、实现数组类模板三、类模板的继承3.1、类模板派生出普通类3.2、类模板派生出类模板 总结 引言 &#x1f4a1; 作者简介&#xff1a;专注于C/C高性能程序设计和开发&#xff0c;理论与代码实践结合&#xff0…

常州工学院单片机及应用系统设计2021-2022 学年第 二 学期 考试类型 开卷 课程编码 0302005

第一题 #include "SC95F861x_C.H" #include <INTRINS.H> unsigned char keydata0; void delay(unsigned int timer) //延时函数 { while(timer>0) timer--; } void IOinit() { P5CON0x00; P5PH0x03; P3CON0xFF; P3PH0xFF; } void readke…

回归预测 | MATLAB实现PSO-DNN粒子群算法优化深度神经网络的数据多输入单输出回归预测

回归预测 | MATLAB实现PSO-DNN粒子群算法优化深度神经网络的数据多输入单输出回归预测 目录 回归预测 | MATLAB实现PSO-DNN粒子群算法优化深度神经网络的数据多输入单输出回归预测效果一览基本介绍模型描述程序设计参考资料 效果一览 基本介绍 回归预测 | MATLAB实现PSO-DNN粒子…