springboot整合springbatch批处理

springboot整合springbatch实现批处理

  • 简介
  • 项目搭建
    • 步骤

简介

项目搭建

参考博客【场景实战】Spring Boot + Spring Batch 实现批处理任务,保姆级教程

步骤

1.建表
建表sql

CREATE TABLE `student` (`id` int NOT NULL AUTO_INCREMENT,`name` varchar(100) NOT NULL COMMENT '姓名',`class_name` varchar(20) DEFAULT NULL COMMENT '班级名称',`china_score` varchar(4) DEFAULT NULL COMMENT '语文成绩',`math_score` varchar(4) DEFAULT NULL COMMENT '数学成绩',`english_score` varchar(4) DEFAULT NULL COMMENT '英语成绩',`sex` tinyint(1) NOT NULL COMMENT '性别:0-男,1-女',`birthday` date NOT NULL COMMENT '生日',`card_id` varchar(20) NOT NULL COMMENT '身份证号',`phone` varchar(20) NOT NULL COMMENT '手机号',PRIMARY KEY (`id`),UNIQUE KEY `card_id` (`card_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='学生表'

2.pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>springbatch_study</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><parent><artifactId>spring-boot-starter-parent</artifactId><groupId>org.springframework.boot</groupId><version>2.3.5.RELEASE</version></parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><version>2.3.5.RELEASE</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.33</version></dependency><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.5.3</version></dependency><!--swagger页面--><dependency><groupId>com.github.xiaoymin</groupId><artifactId>knife4j-spring-boot-starter</artifactId><version>2.0.0</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-batch</artifactId><version>2.3.5.RELEASE</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><version>2.3.5.RELEASE</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.24</version></dependency><dependency><groupId>org.hibernate.validator</groupId><artifactId>hibernate-validator</artifactId><version>6.2.2.Final</version></dependency></dependencies></project>

3.启动类

@SpringBootApplication
@EnableSwagger2
public class BatchService {public static void main(String[] args) {SpringApplication.run(BatchService.class,args);}}

4.配置文件

server:port: 8081
spring:application:name: spring-batch-studydatasource:driver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://localhost:3306/test?serverTimeZone=Asia/Shanghai&characterEncoding=utf-8username: rootpassword: rootbatch:job:enabled: false #需要jobLaucher.run执行initialize-schema: never #第一次没有新建batch内置表时为always,创建内置表后设置为never

注意:spring.batch.initialize-schema第一次运行时写为always,运行后会自动生产batch内置表
5.实体类

package com.test.batch.entity;import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.Data;import java.util.Date;/*** @author 1*/
@Data
public class Student {@TableId(type = IdType.AUTO)private Integer id;/*** '姓名'*/private String name;/*** '班级名称'*/private String className;/*** '语文成绩'*/private String chinaScore;/*** '数学成绩'*/private String mathScore;/*** 英语成绩*/private String englishScore;/*** '性别:0-男,1-女'*/private Integer sex;/*** '生日'*/@JsonFormat(pattern = "yyyy-MM-dd")private Date birthday;/*** '身份证号'*/private String cardId;/*** '手机号'*/private String phone;}

6.batch核心配置类

package com.test.batch.config;import com.test.batch.entity.Student;
import com.test.batch.listen.MyBeanValidator;
import com.test.batch.listen.MyJobListener;
import com.test.batch.listen.MyReaderListener;
import com.test.batch.listen.MyWriteListener;
import com.test.batch.processor.MyProcessor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.convert.ConversionService;
import org.springframework.core.convert.converter.Converter;
import org.springframework.core.convert.support.DefaultConversionService;
import org.springframework.core.io.ClassPathResource;
import org.springframework.transaction.PlatformTransactionManager;import javax.sql.DataSource;
import java.text.SimpleDateFormat;
import java.util.Date;/*** @author 1*/
@Configuration
@EnableBatchProcessing
@Slf4j
public class BatchConfig {/*** JobRepository定义及数据库的操作* @param dataSource* @param transactionManager* @return* @throws Exception*/@Beanpublic JobRepository myJobRepository(DataSource dataSource, PlatformTransactionManager transactionManager)throws Exception{JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();jobRepositoryFactoryBean.setDatabaseType("mysql");jobRepositoryFactoryBean.setTransactionManager(transactionManager);jobRepositoryFactoryBean.setDataSource(dataSource);return jobRepositoryFactoryBean.getObject();}/*** JobLauncher:job的启动器,绑定相关的Repository* @param dataSource* @param transactionManager* @return* @throws Exception*/@Beanpublic SimpleJobLauncher myJobLauncher(DataSource dataSource,PlatformTransactionManager transactionManager)throws Exception{SimpleJobLauncher jobLauncher = new SimpleJobLauncher();jobLauncher.setJobRepository(myJobRepository(dataSource,transactionManager));return jobLauncher;}/*** 定义job* @param jobBuilderFactory* @param myStep* @return*/@Beanpublic Job myJob(JobBuilderFactory jobBuilderFactory, Step myStep){return jobBuilderFactory.get("myJob").incrementer(new RunIdIncrementer()).flow(myStep).end().listener(myJobListener()).build();}/*** 注册job监听器* @return*/@Beanpublic MyJobListener myJobListener(){return new MyJobListener();}/*** 定义itemReader,读取文件数据+entity实体映射* @return*/@Beanpublic ItemReader<Student> reader(){FlatFileItemReader<Student> reader = new FlatFileItemReader<>();//设置文件路径reader.setResource(new ClassPathResource("static/student.csv"));reader.setLineMapper(new DefaultLineMapper<Student>(){{setLineTokenizer(new DelimitedLineTokenizer(){{setNames(new String[]{"name","className","chinaScore","mathScore","englishScore","sex","birthday","cardIdd","phone"});}});setFieldSetMapper(new BeanWrapperFieldSetMapper<Student>(){{setTargetType(Student.class);//设置日期转换setConversionService(createConversionService());}});}});return reader;}public ConversionService createConversionService() {DefaultConversionService conversionService = new DefaultConversionService();DefaultConversionService.addDefaultConverters(conversionService);conversionService.addConverter(new Converter<String, Date>() {@Overridepublic Date convert(String text) {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-mm-dd");Date date = new Date();try {date = sdf.parse(text);}catch (Exception e){log.error("日期转换异常 :{}",e);}return date;}});return conversionService;}/*** 注册ItemProcessor,处理数据* @return*/@Beanpublic ItemProcessor<Student,Student> processor(){MyProcessor myProcessor = new MyProcessor();myProcessor.setValidator(myBeanValidator());return myProcessor;}@Beanpublic MyBeanValidator myBeanValidator(){return new MyBeanValidator<Student>();}/*** 定义ItemWriter,指定DataSource,设置批量插入sql语句,写入数据库* @param dataSource* @return*/@Beanpublic ItemWriter<Student> writer(DataSource dataSource){JdbcBatchItemWriter<Student> writer = new JdbcBatchItemWriter<>();writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());String sql = "insert into student (name,class_name,china_score,math_score,english_score,sex,birthday,card_id,phone) " +"values (:name,:className,:chinaScore,:mathScore,:englishScore,:sex,:birthday,:cardId,:phone)";writer.setSql(sql);writer.setDataSource(dataSource);return writer;}@Beanpublic Step myStep(StepBuilderFactory factory,ItemReader<Student> reader,ItemWriter<Student> writer,ItemProcessor<Student,Student> processor){return factory.get("myStep").<Student,Student>chunk(5000).reader(reader).faultTolerant().retryLimit(3).retry(Exception.class).skip(Exception.class).skipLimit(2).listener(new MyReaderListener()).processor(processor).writer(writer).faultTolerant().skip(Exception.class).skipLimit(2).listener(new MyWriteListener()).build();}
}

7.自定义处理器

package com.test.batch.processor;import com.test.batch.entity.Student;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.item.validator.ValidatingItemProcessor;
import org.springframework.batch.item.validator.ValidationException;/*** @author 1*/
@Slf4j
public class MyProcessor extends ValidatingItemProcessor<Student> {private Integer GOOD = 90;private Integer  BAD = 60;@Overridepublic Student process(Student item) throws ValidationException {/*** 需要执行super.process(item)才会调用自定义校验器*/super.process(item);String chinaScore = item.getChinaScore();String mathScore = item.getMathScore();String englishScore = item.getEnglishScore();String name = item.getName();String phone = item.getPhone();if (GOOD <= Double.parseDouble(chinaScore) && GOOD <= Double.parseDouble(mathScore) && GOOD <= Double.parseDouble(englishScore)){log.info("{}同学三科成绩均为90以上,应该给予奖励", name);}if (BAD > Double.parseDouble(chinaScore) && BAD > Double.parseDouble(mathScore) && BAD > Double.parseDouble(englishScore)){log.info("{}同学三科成绩均不及格,建议通知家长,电话:{}", name,phone);}return item;}
}

8.job监听器

package com.test.batch.listen;import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;/*** @author 1*/
@Slf4j
public class MyJobListener implements JobExecutionListener {@Overridepublic void beforeJob(JobExecution jobExecution) {log.info("job开始,id:{}",jobExecution.getJobId());}@Overridepublic void afterJob(JobExecution jobExecution) {log.info("id:{}",jobExecution.getJobId());}
}

9.读组件监听器

package com.test.batch.listen;import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.ItemReadListener;import static java.lang.String.format;/*** @author 1*/
@Slf4j
public class MyReaderListener implements ItemReadListener {@Overridepublic void beforeRead() {}@Overridepublic void afterRead(Object o) {}@Overridepublic void onReadError(Exception e) {log.error("读取数据失败:{}",e);log.info("item error:"+format("%s%n", e.getMessage()));}
}

10.写组件监听器

package com.test.batch.listen;import com.test.batch.entity.Student;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.ItemWriteListener;import java.util.List;import static java.lang.String.format;/*** @author 1*/
@Slf4j
public class MyWriteListener implements ItemWriteListener<Student> {@Overridepublic void beforeWrite(List<? extends Student> list) {}@Overridepublic void afterWrite(List<? extends Student> list) {}@Overridepublic void onWriteError(Exception e, List<? extends Student> list) {try {log.info(format("%s%n", e.getMessage()));for (Student message : list) {log.info(format("Failed writing Students : %s", message.toString()));}} catch (Exception ex) {log.error("format error :{}",ex);}}
}

11.字段校验

package com.test.batch.listen;import org.springframework.batch.item.validator.ValidationException;
import org.springframework.batch.item.validator.Validator;
import org.springframework.beans.factory.InitializingBean;import javax.validation.ConstraintViolation;
import javax.validation.Validation;
import javax.validation.ValidatorFactory;
import java.util.Set;/*** @author 1*/
public class MyBeanValidator<T> implements Validator<T>, InitializingBean {private javax.validation.Validator validator;@Overridepublic void validate(T t) throws ValidationException {/*** 使用Validator的validate方法校验数据*/Set<ConstraintViolation<T>> constraintViolations =validator.validate(t);if (constraintViolations.size() > 0) {StringBuilder message = new StringBuilder();for (ConstraintViolation<T> constraintViolation : constraintViolations) {message.append(constraintViolation.getMessage() + "\n");}throw new ValidationException(message.toString());}}@Overridepublic void afterPropertiesSet() throws Exception {ValidatorFactory validatorFactory =Validation.buildDefaultValidatorFactory();validator = validatorFactory.usingContext().getValidator();}
}

12.接口

package com.test.batch.controller;import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import lombok.val;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.Date;/*** @author 1*/
@RestController
@Slf4j
public class TestController {@AutowiredSimpleJobLauncher launcher;@Autowiredprivate Job job;@GetMapping("testJob")public ResponseEntity testJob(){try {//job添加参数,确保每个job都唯一JobParameters jobParameters = new JobParametersBuilder().addDate("date",new Date()).toJobParameters();launcher.run(job,jobParameters);}catch (Exception e){log.error("job error:{}",e);return ResponseEntity.ok(e.getMessage());}return ResponseEntity.ok("操作成功!!!");}
}

13.数据
在这里插入图片描述
14.运行后浏览器输入
http://localhost:8081/doc.html
或页面输入localhost:8081/testJob,文件内容成功写入数据库

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/318528.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

手把手将ReactJS项目部署到Ubuntu

我的新书《Android App开发入门与实战》已于2020年8月由人民邮电出版社出版&#xff0c;欢迎购买。点击进入详情 1.构建项目 npm run build 生成build目录&#xff1a; 2.上传项目 将build目录上传到Ubuntu。 可以使用Xftp工具。 3.启动项目 npm install -g serve serve -s …

.mallox勒索病毒数据恢复|金蝶、用友、管家婆、OA、速达、ERP等软件数据库恢复

引言&#xff1a; 随着技术的不断发展&#xff0c;网络空间也不可避免地面临着各种威胁&#xff0c;其中之一就是勒索病毒&#xff0c;而.mallox是近期引起关注的一种恶意软件。本文将介绍.mallox勒索病毒&#xff0c;以及如何有效地恢复被其加密的数据文件&#xff0c;并提供…

Java后端开发——Spring实验

文章目录 Java后端开发——Spring实验一、Spring入门1.创建项目&#xff0c;Spring依赖包。2.创建JavaBean&#xff1a;HelloSpring3.编写applicationContext.xml配置文件4.测试&#xff1a;启动Spring&#xff0c;获取Hello示例。 二、Spring基于XML装配实验1.创建JavaBean类&…

Stata各版本安装指南

Stata下载链接 https://pan.baidu.com/s/1ECc2mPsfNOUUwOQC9hCcYg?pwd0531 1.鼠标右击【Stata18(64bit)】压缩包&#xff08;win11及以上系统需先点击“显示更多选项”&#xff09;【解压到 Stata18(64bit)】。 2.打开解压后的文件夹&#xff0c;鼠标右击【Setup】选择【以管…

视频如何去掉logo水印?这个几个方法记得收藏好!

在当今这个数字媒体的时代&#xff0c;视频已经渗透到我们生活的每一个角落&#xff0c;宛如一道亮丽的风景线&#xff0c;丰富着我们的生活色彩。然而&#xff0c;有些时候&#xff0c;这些视频上的logo水印&#xff0c;却像一片乌云&#xff0c;遮挡住了那原本明媚的阳光&…

印象笔记03 衍生软件使用

印象笔记03 衍生软件使用 Verse 以下内容来源于官方介绍 VERSE是一款面向未来的智能化生产力工具&#xff0c;由印象笔记团队诚意推出。 你可以用VERSE&#xff1a; 管理数字内容&#xff0c;让信息有序高效运转&#xff1b;搭建知识体系&#xff0c;构建你的强大知识库&am…

Conda:Python环境管理的瑞士军刀

在数据科学和机器学习的世界中&#xff0c;管理各种库和依赖关系的重要性不容忽视。Conda 就是为此而生的强大工具。本文将深入探讨 Conda 的简介、功能以及使用示例&#xff0c;帮助你更好地理解和使用这个工具。 Conda 简介 Conda 是一个开源的包管理系统和环境管理系统&am…

【Unity】 HTFramework框架(四十七)编辑器日志中使用超链接的技巧

更新日期&#xff1a;2024年1月3日。 Github源码&#xff1a;[点我获取源码] Gitee源码&#xff1a;[点我获取源码] 索引 日志中使用超链接超链接-网络地址超链接-本地地址超链接-项目资源文件超链接-脚本对象 日志中使用超链接 在编辑器控制台Console中的日志是支持富文本的&…

软件测试之白盒测试

概念与定义 白盒测试&#xff1a;侧重于系统或部件内部机制的测试&#xff0c;类型分为分支测试&#xff08;判定节点测试&#xff09;、路径测试、语句测试。 控制流分析(基于程序结构)&#xff1a;控制流分析是一类用于分析程序控制流结构的静态分析技术&#xff0c;目的在于…

LeetCode刷题---旋转图像

解题思路&#xff1a; 首先对主对角线两边的元素进行交换 接着走一轮遍历&#xff0c;将第1列和第n列进行交换&#xff0c;第2列和第n-1列进行交换&#xff0c;直至得到最终的矩阵。 代码实现&#xff1a; public void rotate(int[][] matrix) {//首先对主对角线的元素进行交换…

数字图像处理(3)——频域图像增强

&#x1f525;博客主页&#xff1a;是dream &#x1f680;系列专栏&#xff1a;深度学习环境搭建、环境配置问题解决、自然语言处理、语音信号处理、项目开发 &#x1f498;每日语录&#xff1a;贤才&#xff0c;难进易出&#xff1b;庸才&#xff0c;易进易初出&#xff1b;…

电子书推荐|VMware 替代与升级攻略:技术路线、产品对比与用户实践

在进行 VMware 国产化替代时&#xff0c;您是否会遇到以下问题&#xff1a; 如何实现 VMware 整体架构/部分组件替换&#xff1f;是否可以不仅“为替换而替换”&#xff0c;而是同时实现架构的升级&#xff0c;带来更多业务价值&#xff1f;哪些国产方案具备 VMware 同等能力&…