【Spring云原生】Spring Batch:海量数据高并发任务处理!数据处理纵享新丝滑!事务管理机制+并行处理+实例应用讲解

 🎉🎉欢迎光临🎉🎉

🏅我是苏泽,一位对技术充满热情的探索者和分享者。🚀🚀

🌟特别推荐给大家我的最新专栏《Spring 狂野之旅:从入门到入魔》 🚀

本专栏带你从Spring入门到入魔!

这是苏泽的个人主页可以看到我其他的内容哦👇👇

努力的苏泽icon-default.png?t=N7T8http://suzee.blog.csdn.net/

本文重点讲解原理!如要看批量数据处理的实战请关注下文(后续补充敬请关注):

实例应用:数据清洗和转换

使用Spring Batch清洗和转换数据

实例应用:数据导入和导出

使用Spring Batch导入和导出数据

实例应用:批处理定时任务

使用Spring Batch实现定时任务

目录

实例应用:数据清洗和转换

使用Spring Batch清洗和转换数据

实例应用:数据导入和导出

使用Spring Batch导入和导出数据

实例应用:批处理定时任务

使用Spring Batch实现定时任务

介绍Spring Batch

Spring Batch入门

解析

需求缔造:假设我们有一个需求,需要从一个CSV文件中读取学生信息,对每个学生的成绩进行转换和校验,并将处理后的学生信息写入到一个数据库表中。

数据处理

扩展Spring Batch

自定义读取器、写入器和处理器

 与其他Spring项目的集成

与Spring Integration的集成:

与Spring Cloud Task的集成:


介绍Spring Batch

Spring Batch是一个基于Java的开源批处理框架,用于处理大规模、重复性和高可靠性的任务。它提供了一种简单而强大的方式来处理批处理作业,如数据导入/导出、报表生成、批量处理等。

什么是Spring Batch?

Spring Batch旨在简化批处理作业的开发和管理。它提供了一种可扩展的模型来定义和执行批处理作业,将作业划分为多个步骤(Step),每个步骤又由一个或多个任务块(Chunk)组成。通过使用Spring Batch,可以轻松处理大量的数据和复杂的业务逻辑。

Spring Batch的特点和优势

  1. 可扩展性和可重用性:Spring Batch采用模块化的设计,提供了丰富的可扩展性和可重用性。可以根据具体需求自定义作业流程,添加或删除步骤,灵活地适应不同的批处理场景。

  2. 事务管理:Spring Batch提供了强大的事务管理机制,确保批处理作业的数据一致性和完整性。可以配置事务边界,使每个步骤或任务块在单独的事务中执行,保证了作业的可靠性。

  3. 监控和错误处理:Spring Batch提供了全面的监控和错误处理机制。可以通过监听器和回调函数来监控作业的执行情况,处理错误和异常情况,以及记录和报告作业的状态和指标。

  4. 并行处理:Spring Batch支持并行处理,可以将作业划分为多个独立的线程或进程来执行,提高作业的处理速度和效率。

Spring Batch入门

1. 安装和配置Spring Batch

首先,确保你的Java开发环境已经安装并配置好。然后,可以使用Maven或Gradle等构建工具来添加Spring Batch的依赖项到你的项目中。详细的安装和配置可以参考Spring Batch的官方文档。

2. 创建第一个批处理作业

在Spring Batch中,一个批处理作业由一个或多个步骤组成,每个步骤又由一个或多个任务块组成。下面是一个简单的示例,演示如何创建一个简单的批处理作业:

@Configuration
@EnableBatchProcessing
public class BatchConfiguration {@Autowiredprivate JobBuilderFactory jobBuilderFactory;@Autowiredprivate StepBuilderFactory stepBuilderFactory;@Beanpublic Step step1() {return stepBuilderFactory.get("step1").tasklet((contribution, chunkContext) -> {System.out.println("Hello, Spring Batch!");return RepeatStatus.FINISHED;}).build();}@Beanpublic Job job(Step step1) {return jobBuilderFactory.get("job").start(step1).build();}
}

解析

首先使用@Configuration@EnableBatchProcessing注解将类标记为Spring Batch的配置类。然后,使用JobBuilderFactoryStepBuilderFactory创建作业和步骤的构建器。在step1方法中,定义了一个简单的任务块,打印"Hello, Spring Batch!"并返回RepeatStatus.FINISHED。最后,在job方法中,使用jobBuilderFactory创建一个作业,并将step1作为作业的起始步骤。

3. 理解Job、Step和任务块

  • Job(作业):作业是一个独立的批处理任务,由一个或多个步骤组成。它描述了整个批处理过程的流程和顺序,并可以有自己的参数和配置。

  • Step(步骤块):步骤是作业的组成部分,用于执行特定的任务。一个作业可以包含一个或多个步骤,每个步骤都可以定义自己的任务和处理逻辑。

  • 任务块(Chunk):任务块是步骤的最小执行单元,用于处理一定量的数据。任务块将数据分为一块一块进行处理,可以定义读取数据、处理数据和写入数据的逻辑。

需求缔造:
假设我们有一个需求,需要从一个CSV文件中读取学生信息,对每个学生的成绩进行转换和校验,并将处理后的学生信息写入到一个数据库表中。

数据处理

  • 数据读取和写入:Spring Batch提供了多种读取和写入数据的方式。可以使用ItemReader读取数据,例如从数据库、文件或消息队列中读取数据。然后使用ItemWriter将处理后的数据写入目标,如数据库表、文件或消息队列。
    首先,我们需要定义一个数据模型来表示学生信息,例如
    public class Student {private String name;private int score;// Getters and setters// ...
    }

    接下来,我们可以使用Spring Batch提供的FlatFileItemReader来读取CSV文件中的数据:

    @Bean
    public FlatFileItemReader<Student> studentItemReader() {FlatFileItemReader<Student> reader = new FlatFileItemReader<>();reader.setResource(new ClassPathResource("students.csv"));reader.setLineMapper(new DefaultLineMapper<Student>() {{setLineTokenizer(new DelimitedLineTokenizer() {{setNames(new String[] { "name", "score" });}});setFieldSetMapper(new BeanWrapperFieldSetMapper<Student>() {{setTargetType(Student.class);}});}});return reader;
    }

支持的数据格式和数据源

  • Spring Batch支持各种数据格式和数据源。可以使用适配器和读写器来处理不同的数据格式,如CSV、XML、JSON等。同时,可以通过自定义的数据读取器和写入器来处理不同的数据源,如关系型数据库、NoSQL数据库等。

数据转换和校验

  • Spring Batch提供了数据转换和校验的机制。可以使用ItemProcessor对读取的数据进行转换、过滤和校验。ItemProcessor可以应用自定义的业务逻辑来处理每个数据项。

      我们配置了一个FlatFileItemReader,设置了CSV文件的位置和行映射器,指定了字段分隔符和字段到模型属性的映射关系。

    接下来,我们可以定义一个ItemProcessor来对读取的学生信息进行转换和校验:

    @Bean
    public ItemProcessor<Student, Student> studentItemProcessor() {return new ItemProcessor<Student, Student>() {@Overridepublic Student process(Student student) throws Exception {// 进行转换和校验if (student.getScore() < 0) {// 校验不通过,抛出异常throw new IllegalArgumentException("Invalid score for student: " + student.getName());}// 转换操作,例如将分数转换为百分制int percentage = student.getScore() * 10;student.setScore(percentage);return student;}};
    }
     

     在上述代码中,我们定义了一个ItemProcessor,对学生信息进行校验和转换。如果学生的分数小于0,则抛出异常;否则,将分数转换为百分制。

    最后,我们可以使用Spring Batch提供的JdbcBatchItemWriter将处理后的学生信息写入数据库:

    @Bean
    public JdbcBatchItemWriter<Student> studentItemWriter(DataSource dataSource) {JdbcBatchItemWriter<Student> writer = new JdbcBatchItemWriter<>();writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());writer.setSql("INSERT INTO students (name, score) VALUES (:name, :score)");writer.setDataSource(dataSource);return writer;
    }

作业调度和监控

  • 作业调度器的配置:Spring Batch提供了作业调度器来配置和管理批处理作业的执行。可以使用Spring的调度框架(如Quartz)或操作系统的调度工具(如cron)来调度作业。通过配置作业调度器,可以设置作业的触发时间、频率和其他调度参数。
     

    在上述代码中,我们配置了一个JdbcBatchItemWriter,设置了SQL语句和数据源,将处理后的学生信息批量插入数据库表中。

    最后,我们需要配置一个作业步骤来组装数据读取、处理和写入的过程:

    @Bean
    public Step processStudentStep(ItemReader<Student> reader, ItemProcessor<Student, Student> processor, ItemWriter<Student> writer) {return stepBuilderFactory.get("processStudentStep").<Student, Student>chunk(10).reader(reader).processor(processor).writer(writer).build();
    }

    在上述代码中,我们使用stepBuilderFactory创建了一个步骤,并指定了数据读取器、处理器和写入器。

  • 作业执行的监控和管理:Spring Batch提供了丰富的监控和管理功能。可以使用Spring Batch的管理接口和API来监控作业的执行状态、进度和性能指标。还可以使用日志记录、通知和报警机制来及时获取作业执行的状态和异常信息。
     

    最后,我们可以配置一个作业来调度执行该步骤:

    @Bean
    public Job processStudentJob(JobBuilderFactory jobBuilderFactory, Step processStudentStep) {return jobBuilderFactory.get("processStudentJob").flow(processStudentStep).end().build();
    }

    我们使用jobBuilderFactory创建了一个作业,并指定了步骤来执行。

    通过以上的示例,我们演示了Spring Batch中数据读取和写入的方式,使用了FlatFileItemReader读取CSV文件,使用了JdbcBatchItemWriter将处理后的学生信息写入数据库。同时,我们使用了ItemProcessor对读取的学生信息进行转换和校验。这个例子还展示了Spring Batch对不同数据源和数据格式的支持,以及如何配置和组装作业步骤来完成整个批处理任务。

错误处理和重试机制

  • Spring Batch提供了错误处理和重试机制,以确保批处理作业的稳定性和可靠性。可以配置策略来处理读取、处理和写入过程中的错误和异常情况。可以设置重试次数、重试间隔和错误处理策略,以适应不同的错误场景和需求。
    首先,我们可以在步骤配置中设置错误处理策略。例如,我们可以使用SkipPolicy来跳过某些异常,或者使用RetryPolicy来进行重试。
    @Bean
    public Step processStudentStep(ItemReader<Student> reader, ItemProcessor<Student, Student> processor, ItemWriter<Student> writer) {return stepBuilderFactory.get("processStudentStep").<Student, Student>chunk(10).reader(reader).processor(processor).writer(writer).faultTolerant().skip(Exception.class).skipLimit(10).retry(Exception.class).retryLimit(3).build();
    }

    我们使用faultTolerant()方法来启用错误处理策略。然后,使用skip(Exception.class)指定跳过某些异常,使用skipLimit(10)设置跳过的最大次数为10次。同时,使用retry(Exception.class)指定重试某些异常,使用retryLimit(3)设置重试的最大次数为3次。

    在默认情况下,如果发生读取、处理或写入过程中的异常,Spring Batch将标记该项为错误项,并尝试跳过或重试,直到达到跳过或重试的次数上限为止。

    此外,您还可以为每个步骤配置错误处理器,以定制化处理错误项的逻辑。例如,可以使用SkipListener来处理跳过的项,使用RetryListener来处理重试的项。
     

    @Bean
    public SkipListener<Student, Student> studentSkipListener() {return new SkipListener<Student, Student>() {@Overridepublic void onSkipInRead(Throwable throwable) {// 处理读取过程中发生的异常}@Overridepublic void onSkipInWrite(Student student, Throwable throwable) {// 处理写入过程中发生的异常}@Overridepublic void onSkipInProcess(Student student, Throwable throwable) {// 处理处理过程中发生的异常}};
    }@Bean
    public RetryListener studentRetryListener() {return new RetryListener() {@Overridepublic <T, E extends Throwable> boolean open(RetryContext retryContext, RetryCallback<T, E> retryCallback) {// 在重试之前执行的逻辑return true;}@Overridepublic <T, E extends Throwable> void onError(RetryContext retryContext, RetryCallback<T, E> retryCallback, Throwable throwable) {// 处理重试过程中发生的异常}@Overridepublic <T, E extends Throwable> void close(RetryContext retryContext, RetryCallback<T, E> retryCallback, Throwable throwable) {// 在重试之后执行的逻辑}};
    }@Bean
    public Step processStudentStep(ItemReader<Student> reader, ItemProcessor<Student, Student> processor, ItemWriter<Student> writer,SkipListener<Student, Student> skipListener, RetryListener retryListener) {return stepBuilderFactory.get("processStudentStep").<Student, Student>chunk(10).reader(reader).processor(processor).writer(writer).faultTolerant().skip(Exception.class).skipLimit(10).retry(Exception.class).retryLimit(3).listener(skipListener).listener(retryListener).build();
    }

批处理最佳实践

  • 数据量控制:在批处理作业中,应注意控制数据量的大小,以避免内存溢出或处理速度过慢的问题。可以通过分块(Chunk)处理和分页读取的方式来控制数据量。

  • 事务管理:在批处理作业中,对于需要保证数据一致性和完整性的操作,应使用适当的事务管理机制。可以配置事务边界,确保每个步骤或任务块在独立的事务中执行。

  • 错误处理和日志记录:合理处理错误和异常情况是批处理作业的重要部分。应使用适当的错误处理策略、日志记录和报警机制,以便及时发现和处理问题。

  • 性能调优:在批处理作业中,应关注性能调优的问题。可以通过合理的并行处理、合理配置的线程池和适当的数据读取和写入策略来提高作业的处理速度和效率。

  • 监控和管理:对于长时间运行的批处理作业,应设置适当的监控和管理机制。可以使用监控工具、警报系统和自动化任务管理工具来监控作业的执行情况和性能指标。

扩展Spring Batch

自定义读取器、写入器和处理器

Spring Batch提供了许多扩展点,可以通过自定义读取器、写入器和处理器以及其他组件来扩展和定制批处理作业的功能。

public class MyItemReader implements ItemReader<String> {private List<String> data = Arrays.asList("item1", "item2", "item3");private Iterator<String> iterator = data.iterator();@Overridepublic String read() throws Exception {if (iterator.hasNext()) {return iterator.next();} else {return null;}}
}

自定义写入器:

public class MyItemWriter implements ItemWriter<String> {@Overridepublic void write(List<? extends String> items) throws Exception {for (String item : items) {// 自定义写入逻辑}}
}

自定义处理器:

public class MyItemProcessor implements ItemProcessor<String, String> {@Overridepublic String process(String item) throws Exception {// 自定义处理逻辑return item.toUpperCase();}
}

批处理作业的并行处理:

Spring Batch支持将批处理作业划分为多个独立的步骤,并通过多线程或分布式处理来实现并行处理。

  1. 多线程处理:可以通过配置TaskExecutor来实现多线程处理。通过使用TaskExecutor,每个步骤可以在独立的线程中执行,从而实现并行处理。
    @Bean
    public TaskExecutor taskExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(5);executor.setMaxPoolSize(10);executor.setQueueCapacity(25);return executor;
    }@Bean
    public Step myStep(ItemReader<String> reader, ItemProcessor<String, String> processor, ItemWriter<String> writer) {return stepBuilderFactory.get("myStep").<String, String>chunk(10).reader(reader).processor(processor).writer(writer).taskExecutor(taskExecutor()).build();
    }

    在上述代码中,我们通过taskExecutor()方法定义了一个线程池任务执行器,并将其配置到步骤中的taskExecutor()方法中。

  2. 分布式处理:如果需要更高的并行性和可伸缩性,可以考虑使用分布式处理。Spring Batch提供了与Spring Integration和Spring Cloud Task等项目的集成,以实现分布式部署和处理。

 与其他Spring项目的集成

  1. 与Spring Integration的集成:

首先,需要在Spring Batch作业中配置Spring Integration的消息通道和适配器。可以使用消息通道来发送和接收作业的输入和输出数据,使用适配器来与外部系统进行交互。

@Configuration
@EnableBatchProcessing
@EnableIntegration
public class BatchConfiguration {@Autowiredprivate JobBuilderFactory jobBuilderFactory;@Autowiredprivate StepBuilderFactory stepBuilderFactory;@Autowiredprivate MyItemReader reader;@Autowiredprivate MyItemProcessor processor;@Autowiredprivate MyItemWriter writer;@Beanpublic IntegrationFlow myJobFlow() {return IntegrationFlows.from("jobInputChannel").handle(jobLaunchingGateway()).get();}@Beanpublic MessageChannel jobInputChannel() {return new DirectChannel();}@Beanpublic MessageChannel jobOutputChannel() {return new DirectChannel();}@Beanpublic MessageChannel stepInputChannel() {return new DirectChannel();}@Beanpublic MessageChannel stepOutputChannel() {return new DirectChannel();}@Beanpublic JobLaunchingGateway jobLaunchingGateway() {SimpleJobLauncher jobLauncher = new SimpleJobLauncher();jobLauncher.setJobRepository(jobRepository());return new JobLaunchingGateway(jobLauncher);}@Beanpublic JobRepository jobRepository() {// 配置作业存储库}@Beanpublic Job myJob() {return jobBuilderFactory.get("myJob").start(step1()).build();}@Beanpublic Step step1() {return stepBuilderFactory.get("step1").<String, String>chunk(10).reader(reader).processor(processor).writer(writer).inputChannel(stepInputChannel()).outputChannel(stepOutputChannel()).build();}
}

在上述代码中,我们配置了Spring Batch作业的消息通道和适配器。myJobFlow()方法定义了一个整合流程,它从名为jobInputChannel的消息通道接收作业请求,并通过jobLaunchingGateway()方法启动作业。jobLaunchingGateway()方法创建一个JobLaunchingGateway实例,用于启动作业。

与Spring Cloud Task的集成:

首先,需要在Spring Batch作业中配置Spring Cloud Task的任务启动器和任务监听器。任务启动器用于启动和管理分布式任务,任务监听器用于在任务执行期间执行一些操作。

@Configuration
@EnableBatchProcessing
@EnableTask
public class BatchConfiguration {@Autowiredprivate JobBuilderFactory jobBuilderFactory;@Autowiredprivate StepBuilderFactory stepBuilderFactory;@Autowiredprivate MyItemReader reader;@Autowiredprivate MyItemProcessor processor;@Autowiredprivate MyItemWriter writer;@Beanpublic TaskConfigurer taskConfigurer() {return new DefaultTaskConfigurer();}@Beanpublic TaskExecutor taskExecutor() {return new SimpleAsyncTaskExecutor();}@Beanpublic Job myJob() {return jobBuilderFactory.get("myJob").start(step1()).build();}@Beanpublic Step step1() {return stepBuilderFactory.get("step1").<String, String>chunk(10).reader(reader).processor(processor).writer(writer).taskExecutor(taskExecutor()).build();}@Beanpublic TaskListener myTaskListener() {return new MyTaskListener();}@Beanpublic TaskExecutionListener myTaskExecutionListener() {return new MyTaskExecutionListener();}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/509289.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

程序环境和预处理(2)

文章目录 3.2.7 命名约定 3.3 #undef3.4 命令行定义3.5 条件编译3.6 文件包含3.6.1 头文件被包含的方式3.6.2 嵌套文件包含 4. 其他预处理指令 3.2.7 命名约定 一般来讲函数和宏的使用语法很相似&#xff0c;所以语言本身没法帮我们区分二者&#xff0c;那我们平时的一个习惯是…

JavaWeb Tomcat启动、部署、配置、集成IDEA

web服务器软件 服务器是安装了服务器软件的计算机&#xff0c;在web服务器软件中&#xff0c;可以部署web项目&#xff0c;让用户通过浏览器来访问这些项目。 Web服务器是一个应用程序&#xff08;软件&#xff09;&#xff0c;对HTTP协议的操作进行封装&#xff0c;使得程序…

CentOS部署FastDFS+Nginx并实现远程访问本地服务器中文件

文章目录 前言1. 本地搭建FastDFS文件系统1.1 环境安装1.2 安装libfastcommon1.3 安装FastDFS1.4 配置Tracker1.5 配置Storage1.6 测试上传下载1.7 与Nginx整合1.8 安装Nginx1.9 配置Nginx 2. 局域网测试访问FastDFS3. 安装cpolar内网穿透4. 配置公网访问地址5. 固定公网地址5.…

LeetCode 刷题 [C++] 第279题.完全平方数

题目描述 给你一个整数 n &#xff0c;返回 和为 n 的完全平方数的最少数量 。 完全平方数是一个整数&#xff0c;其值等于另一个整数的平方&#xff1b;换句话说&#xff0c;其值等于一个整数自乘的积。例如&#xff0c;1、4、9 和 16 都是完全平方数&#xff0c;而 3 和 11…

U盘删除的文件怎么找回?3个精选方法分享!

“在对u盘进行清理时&#xff0c;我删除了很多不需要的文件&#xff0c;但是刚刚检查的时候才发现&#xff0c;有些还需要使用的文件也被删掉了。这可怎么办呢&#xff1f;我应该如何恢复这些文件呢&#xff1f;” U盘便携性、大容量、高速传输以及易于操作的优点使得它在移动办…

【 10X summary report】怎么看?详细解读笔记

报告内容 在开始正式的分析之前&#xff0c;需要查看在对齐和计数过程中生成的任何总结统计信息。下图是由Cell Ranger工具创建的10X总结报告&#xff0c;在从10X scRNA-seq实验生成计数矩阵时会生成。 The left half of the report describes sequencing and mapping statist…

攻防世界-get_post

题目信息 相关知识 -G&#xff1a;表示GET请求&#xff0c;缺省POST -d参数用于发送 POST 请求的数据体 使用-d参数以后&#xff0c;HTTP 请求会自动加上标头Content-Type : application/x-www-form-urlencoded。并且会自动将请求转为 POST 方法&#xff0c;因此可以省略-X PO…

docker 创建RedHat8.5镜像

确定要创建的小红帽版本&#xff0c;可以进入官网查看 https://hub.docker.com/search?qRedHat 复制命令到安装docker的机器上&#xff0c;拉取小红帽镜像。 docker pull redhat/ubi8:latest 执行完成后&#xff0c;查看镜像是否拉取成功 docker images |grep redhat 如图…

Linux安装JumpServer并结合内网穿透实现公网访问本地服务

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学…

vscode使用git

更改的文件 点击号 &#xff0c; 相当于git add 添加到暂存区 -号 取消暂存区内容 可以查看更改的前后对比 编辑器左下角点击分支&#xff0c;可以创建新分支 提交到暂存区后&#xff0c;点击 提交 &#xff0c; 编辑备注内容 &#xff0c;相当于git commit -m 提交备注内容 同…

千兆单口(百兆双口)小体积 24PIN 网络变压器 H82409S 特点

Hqst华轩盛(石门盈盛)电子导读&#xff1a;千兆单口&#xff08;百兆双口&#xff09;小体积 24PIN 网络变压器 H82409S 特点 大家好&#xff0c;石门盈盛电子科技有限公司工程盛先生&#xff0c;今天向大家介绍石门盈盛电子科技有限公司的一款优势产品 - 千兆单口&#xff08;…

Java中的数据压缩和存储技术:Zip、GZip与Brotli

第1章&#xff1a;引言 大家好&#xff0c;我是小黑&#xff0c;作为一名Java程序员&#xff0c;在业务开发中&#xff0c;常常面临着一个问题&#xff1a;如何高效地处理和传输这些庞大的数据呢&#xff1f;答案就在于数据压缩技术。数据压缩&#xff0c;简而言之&#xff0c…