双异步系列完结撒花,如何解決异步事务问题?

在这里插入图片描述

目录

    • 一、前情提要
      • 原始需求:读取一个10万行的Excel
      • 优化1:[**使用双异步后,从 191s 优化到 2s**](https://blog.csdn.net/guorui_java/article/details/135143234)
      • 优化2:[**使用双异步后,如何保证数据一致性?**](https://blog.csdn.net/guorui_java/article/details/135705565)
      • 优化3:[**获取双异步返回值时,如何保证主线程不阻塞?**](https://blog.csdn.net/guorui_java/article/details/135708663)
    • 二、异步某线程失败时,主线程回滚所有异步线程的事务!
    • 三、@Transactional注解
      • 1、@Transactional
      • 2、@Transactional(rollbackFor = Exception.class)
    • 四、注解失效问题
      • 1、@Transactional 应用在非 public 修饰的方法上
      • 2、@Transactional 注解属性 rollbackFor 设置错误
      • 3、同一个类中方法调用,导致@Transactional失效
      • 4、捕获异常
    • 五、通过Future获取异步返回值,添加事务
      • 1、添加事务
        • (1)添加事务 + 不开启异步
        • (2)添加事务 + 开启异步
      • 2、手动添加事务
        • (1)添加事务 + 不开启异步
        • (2)Future获取异步返回值,添加手动事务,异常回滚失败!
    • 六、@async + @Transactional 事务失效问题
    • 七、Spring 的事务传播机制是不能够跨线程的
      • 1、一个异步线程一个事务,然后根据结果统一提交/回滚?
      • 2、核心代码
      • 3、异步线程类
      • 4、事务复制类
      • 5、为何要用事务复制类?而最后提交和回滚的时候也没用它?
    • 八、总结
      • 读取一个10万行的Excel的最佳解决方案是:

大家好,我是哪吒。

一、前情提要

在上一篇文章中,我们通过双异步的方式导入了10万行的Excel,有个小伙伴在评论区问我,如果保证事务呢,如果分批的话。

原始需求:读取一个10万行的Excel

通过串行读取Excel,单个Excel耗时191s。

在这里插入图片描述

优化1:使用双异步后,从 191s 优化到 2s

  1. 分别通过POI和EasyExcel的方式读取Excel并插入数据库;
  2. 探讨了“线程池中的核心线程数设置问题”;
  3. 经过数十次的测试,总结了通过线程池的方式,争取一次性并行入库,效率最佳。

在这里插入图片描述

优化2:使用双异步后,如何保证数据一致性?

通过Future获取异步返回值,再和Excel文件数据行进行比较,实现对数据准确性的判断!

  1. 逐行分析了FutureTask源码,绘制了FutureTask执行流程图;
  2. 分析get()源码,绘制get()方法执行流程图;
  3. 但是,发现了一个问题,Future.get()会造成主线程的阻塞。

在这里插入图片描述

优化3:获取双异步返回值时,如何保证主线程不阻塞?

Java8中引入了CompletableFuture,它实现了对Future的全面升级,可以通过回调的方式,获取异步线程返回值。

CompletableFuture的异步执行通过ForkJoinPool实现, 它使用守护线程去执行任务。

ForkJoinPool在于可以充分利用多核CPU的优势,把一个任务拆分成多个小任务,把多个小任务放到多个CPU上并行执行,当多个小任务执行完毕后,再将其执行结果合并起来。

  1. 通过CompletableFuture优化 “通过Future获取异步返回值”;
  2. CompletableFuture和Future的效率对比;
  3. 自定义ForkJoinPool线程池;
  4. 核心线程数相同的情况下,CompletableFuture的入库效率要优于Future的入库效率,10万条数据大概要快4秒钟;
  5. 通过CompletableFuture.allOf解决阻塞主线程问题;
  6. 总结了CompletableFuture中花俏的语法糖。

二、异步某线程失败时,主线程回滚所有异步线程的事务!

想要保证事务,肯定是使用@Transactional来实现。

现在的场景是导入若干个大的Excel文件数据,因为每个Excel导入的表不同,所以只要保证单Excel的事务即可。

上文中,是使用异步批量读取并插入的方式实现的Excel文件入库。

也就是说,1个主线程事务 + 若干个子线程事务,我们想要保证单Excel的插入事务,所有异步子线程有任何一个报错,都要进行事务回滚,如果全部都没报错,则进行事务提交。

这个时候,有的小伙伴可能会想到,主线程加个@Transactional注解,所有子线程分别加@Transactional注解,就可以了吧?

但是,这样是不行的,子线程的异常只会回滚其自身的事务。

如果Excel中有10万条数据,一次插入4200条数据,最后一次插入3400条。如果其它线程都插入成功了,最后一个报错了,此时,数据库中还是会有96600条数据插入成功,与单Excel的事务需求不符。

通过代码模拟这种情况:

if(end == sheet.getLastRowNum()){logger.info("插入最后一批数据,模拟异常");int a = 1/0;
}

在这里插入图片描述
在这里插入图片描述

三、@Transactional注解

声明式事务管理建立在AOP之上的。其本质是对方法前后进行拦截,然后在目标方法开始之前创建或者加入一个事务,在执行完目标方法之后根据执行情况提交或者回滚事务。

简而言之,@Transactional注解在代码执行出错的时候能够进行事务的回滚。

  1. 在启动类上添加@EnableTransactionManagement注解。
  2. 用于类上时,该类的所有 public 方法将都具有该类型的事务属性,同时,我们也可以在方法级别使用该标注来覆盖类级别的定义。
  3. 在项目中,@Transactional(rollbackFor=Exception.class),如果类加了这个注解,那么这个类里面的方法抛出异常,就会回滚,数据库里面的数据也会回滚。
  4. 在@Transactional注解中如果不配置rollbackFor属性,那么事物只会在遇到RuntimeException的时候才会回滚,加上rollbackFor=Exception.class,可以让事物在遇到非运行时异常时也回滚。

1、@Transactional

使用@Transactional后,当程序发生RuntimeException运行时异常在没有使用try,catch进行捕获的时候,程序都会中止,当程序发生中止,则会触发数据库的回滚。

当使用了trycatch进行捕获到这个异常,假如在catch中加入了throw e抛出异常,则程序中止,数据库回滚。

加入在try catch中没有throw e 抛出异常,只是简单的打印异常,则异常被捕获未抛出异常去终止程序,在trycatch中的操作数据库语句插入失败,在trycatch上面和下面的数据库相关插入语句成功,也就是程序成功跑完,数据库不会发生回滚。

2、@Transactional(rollbackFor = Exception.class)

在@Transactional注解中如果不配置rollbackFor属性,那么事物只会在遇到RuntimeException的时候才会回滚,加上rollbackFor=Exception.class,可以让事物在遇到非运行时异常时也回滚。

四、注解失效问题

1、@Transactional 应用在非 public 修饰的方法上

事务拦截器在目标方法执行前后进行拦截,内部会调用方法来获取Transactional 注解的事务配置信息,调用前会检查目标方法的修饰符是否为 public,不是 public则不会获取@Transactional 的属性配置信息。

2、@Transactional 注解属性 rollbackFor 设置错误

rollbackFor 可以指定能够触发事务回滚的异常类型。

Spring默认抛出了未检查unchecked异常(继承自 RuntimeException 的异常)或者 Error才回滚事务;其他异常不会触发回滚事务。

如果在事务中抛出其他类型的异常,但却期望 Spring 能够回滚事务,就需要指定rollbackFor属性。

3、同一个类中方法调用,导致@Transactional失效

开发中避免不了会对同一个类里面的方法调用,比如有一个类Test,它的一个方法A,A再调用本类的方法B(不论方法B是用public还是private修饰),但方法A没有声明注解事务,而B方法有。则外部调用方法A之后,方法B的事务是不会起作用的。这也是经常犯错误的一个地方。

那为啥会出现这种情况?其实这还是由于使用Spring AOP代理造成的,因为只有当事务方法被当前类以外的代码调用时,才会由Spring生成的代理对象来管理。

在同一个类中调用异步方法,等于调用this本类的方法,没有走Spring生成的代理类,也就不会让他异步执行,@Transactional的原理也类似。

4、捕获异常

如果你手动的catch捕获这个异常并进行处理,事务管理器会认为当前事务应该正常commit,就会导致注解失效,如果非要捕获且不失效,就必须在代码块内throw new Exception抛出异常。

五、通过Future获取异步返回值,添加事务

1、添加事务

@Transactional(rollbackFor = Exception.class)
public void readXls(String filePath, String filename) throws Exception{try {// 省略一些复杂操作...List<Future<Integer>> futureList = new ArrayList<>();for (int time = 0; time < times; time++) {Future<Integer> sumFuture = readExcelDataAsyncFutureService.readXlsCacheAsyncMybatis();futureList.add(sumFuture);}// 主线程获取Future返回值boolean futureFlag = getFutureResult(futureList, excelRow);if (futureFlag) {logger.info("readXlsCacheAsync---插入数据成功,提交事务");} else {TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();logger.info("readXlsCacheAsync---插入数据失败,回滚事务");}} catch (Exception e) {TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();logger.error("readXlsCacheAsync---插入数据异常,回滚事务:", e);}
}@Async("async-executor")//是否开启异步
@Override
public Integer readXlsCacheAsyncMybatis() {try {// 省略一些复杂操作...}catch (Exception e){throw new RuntimeException("插入数据库异常", e);}
}
(1)添加事务 + 不开启异步

如果入库异常,事务回滚成功

在这里插入图片描述

(2)添加事务 + 开启异步

回滚失败!

在这里插入图片描述

2、手动添加事务

public void readXls(String filePath, String filename) throws Exception{// 手动开启事务,不自动提交TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);try {// 省略一些复杂操作...List<Future<Integer>> futureList = new ArrayList<>();for (int time = 0; time < times; time++) {Future<Integer> sumFuture = readExcelDataAsyncFutureService.readXlsCacheAsyncMybatis();futureList.add(sumFuture);}// 主线程获取Future返回值boolean futureFlag = getFutureResult(futureList, excelRow);if (futureFlag) {dataSourceTransactionManager.commit(transactionStatus); // 提交logger.info("readXlsCacheAsync---插入数据成功,提交事务");} else {dataSourceTransactionManager.rollback(transactionStatus);// 回滚logger.info("readXlsCacheAsync---插入数据失败,回滚事务");}} catch (Exception e) {dataSourceTransactionManager.rollback(transactionStatus);// 回滚logger.error("readXlsCacheAsync---插入数据异常,回滚事务:", e);}
}@Async("async-executor")//是否开启异步
@Override
public Integer readXlsCacheAsyncMybatis() {try {// 省略一些复杂操作...}catch (Exception e){throw new RuntimeException("插入数据库异常", e);}
}
(1)添加事务 + 不开启异步

如果入库异常,事务回滚成功

在这里插入图片描述

(2)Future获取异步返回值,添加手动事务,异常回滚失败!

在这里插入图片描述

六、@async + @Transactional 事务失效问题

回顾一下需求:异步某线程失败时,主线程回滚所有异步线程的事务!

是代码有问题,还是就是实现不了呢?

@Async和@Transactional注解都是通过Spring aop实现的,核心都是靠着关键的MethodInterceptor实现,@Async会给对应bean代理对象中放入一个AnnotationAsyncExecutionInterceptor拦截器,而@Transactional会给对应bean的代理对象中放入一个TransactionInterceptor拦截器。

Spring事务管理的传播机制是使用 ThreadLocal 实现的。因为 ThreadLocal 是线程私有的,所以 Spring 的事务传播机制是不能够跨线程的。

七、Spring 的事务传播机制是不能够跨线程的

1、一个异步线程一个事务,然后根据结果统一提交/回滚?

2、核心代码

/*** 数据源事务管理器*/
private DataSourceTransactionManager dataSourceTransactionManager;@Autowired
public void setUserService(DataSourceTransactionManager dataSourceTransactionManager) {this.dataSourceTransactionManager = dataSourceTransactionManager;
}@Override
public void readXls(String filePath, String filename) {List<TransactionStatus> transactionStatusList = Collections.synchronizedList(new ArrayList<>());List<TransactionResource> transactionResourceList = Collections.synchronizedList(new ArrayList<>());try {List<Future<Integer>> futureList = new ArrayList<>();for (int time = 0; time < times; time++) {Future<Integer> sumFuture = readAsyncFutureTransactionDBService.readXlsCacheAsyncMybatis(sheet, row, start, end, insertBuilder,transactionStatusList,transactionResourceList);futureList.add(sumFuture);}// 主线程获取Future返回值boolean futureFlag = getFutureResult(futureList, excelRow);if (futureFlag) {for (int i = 0; i < transactionStatusList.size(); i++) {TransactionStatus transactionStatus = transactionStatusList.get(i);dataSourceTransactionManager.commit(transactionStatus); // 提交}logger.info("readXlsCacheAsync---插入数据成功,提交事务");} else {for (int i = 0; i < transactionStatusList.size(); i++) {TransactionStatus transactionStatus = transactionStatusList.get(i);dataSourceTransactionManager.rollback(transactionStatus);// 回滚}logger.info("readXlsCacheAsync---插入数据失败,事务回滚");throw new RuntimeException("readXlsCacheAsync---插入数据异常,异常事务回滚");}} catch (Exception e) {logger.error("readXlsCacheAsync---插入数据异常,事务回滚:", e);for (int i = 0; i < transactionStatusList.size(); i++) {TransactionStatus transactionStatus = transactionStatusList.get(i);dataSourceTransactionManager.rollback(transactionStatus);// 回滚}//connection.rollback();throw new RuntimeException("readXlsCacheAsync---插入数据异常,异常事务回滚");}
}

3、异步线程类

@Async("async-executor")
@Override
public Future<Integer> readXlsCacheAsyncMybatis(XSSFSheet sheet, XSSFRow row, int start, int end, StringBuilder insertBuilder,List<TransactionStatus> transactionStatusList,List<ReadAsyncFutureTransactionServiceImpl.TransactionResource> transactionResourceList) throws Exception {DefaultTransactionDefinition defaultTransactionDefinition = new DefaultTransactionDefinition();TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(defaultTransactionDefinition);// 开启新事务transactionStatusList.add(transactionStatus);// copy事务资源transactionResourceList.add(ReadAsyncFutureTransactionServiceImpl.TransactionResource.copyTransactionResource());try {// 入库操作}catch (Exception e){throw new RuntimeException("readXlsCacheAsyncMybatis分批异步读取Excel,通过Mybatis插入数据库异常");}
}

4、事务复制类

/*** 保存当前事务资源,用于线程间的事务资源COPY操作* <p>* `@Builder`注解是Lombok库提供的一个注解,它可以用于自动生成Builder模式的代码,使用@Builder注解可以简化创建对象实例的过程,并且可以使代码更加清晰和易于维护*/
static class TransactionResource {// TransactionSynchronizationManager类内部默认提供了下面六个ThreadLocal属性,分别保存当前线程对应的不同事务资源// 保存当前事务关联的资源,默认只会在新建事务的时候保存当前获取到的DataSource和当前事务对应Connection的映射关系// 当然这里Connection被包装为了ConnectionHolder// 事务结束后默认会移除集合中的DataSource作为key关联的资源记录private Map<Object, Object> resources;//下面五个属性会在事务结束后被自动清理,无需我们手动清理// 事务监听者,在事务执行到某个阶段的过程中,会去回调监听者对应的回调接口(典型观察者模式的应用),默认为空集合private Set<TransactionSynchronization> synchronizations;// 存放当前事务名字private String currentTransactionName;// 存放当前事务是否是只读事务private Boolean currentTransactionReadOnly;// 存放当前事务的隔离级别private Integer currentTransactionIsolationLevel;// 存放当前事务是否处于激活状态private Boolean actualTransactionActive;/*** 对事务资源进行复制** @return TransactionResource*/public static TransactionResource copyTransactionResource() {return TransactionResource.builder()//返回的是不可变集合.resources(TransactionSynchronizationManager.getResourceMap())//如果需要注册事务监听者,这里记得修改,我们这里不需要,就采用默认负责,spring事务内部默认也是这个值.synchronizations(new LinkedHashSet<>()).currentTransactionName(TransactionSynchronizationManager.getCurrentTransactionName()).currentTransactionReadOnly(TransactionSynchronizationManager.isCurrentTransactionReadOnly()).currentTransactionIsolationLevel(TransactionSynchronizationManager.getCurrentTransactionIsolationLevel()).actualTransactionActive(TransactionSynchronizationManager.isActualTransactionActive()).build();}/*** 使用*/public void autoWiredTransactionResource() {resources.forEach(TransactionSynchronizationManager::bindResource);//如果需要注册事务监听者,这里记得修改,我们这里不需要,就采用默认负责,spring事务内部默认也是这个值TransactionSynchronizationManager.initSynchronization();TransactionSynchronizationManager.setActualTransactionActive(actualTransactionActive);TransactionSynchronizationManager.setCurrentTransactionName(currentTransactionName);TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(currentTransactionIsolationLevel);TransactionSynchronizationManager.setCurrentTransactionReadOnly(currentTransactionReadOnly);}/*** 移除*/public void removeTransactionResource() {// 事务结束后默认会移除集合中的DataSource作为key关联的资源记录// DataSource如果重复移除,unbindResource时会因为不存在此key关联的事务资源而报错resources.keySet().forEach(key -> {if (!(key instanceof DataSource)) {TransactionSynchronizationManager.unbindResource(key);}});}
}

5、为何要用事务复制类?而最后提交和回滚的时候也没用它?

如何不加会怎么样?

在提交和回滚的时候,会出现异常:

在这里插入图片描述

八、总结

经过不懈的努力,终于解决了“异步某线程失败时,主线程回滚所有异步线程的事务!”这个看起来很简单的问题。

也是对双异步入库系列的一个完结。

通过添加事务,可以有效的控制Excel异步插入数据的准确性。

在这里插入图片描述

读取一个10万行的Excel的最佳解决方案是:

  1. 通过EasyExcel异步读取Excel;
  2. 通过Future获取异步返回值,比较Excel行数和入库数,保证数据入库一致性;
  3. 通过CompletableFuture + 自定义ForkJoinPool线程池的方式执行,解决主线程阻塞问题;
  4. 根据核心线程数,设置每个线程读取的Excel数据行数,以达到效率最佳;
  5. 通过手动添加事务 + 一个线程一个事务 + 复制事务的方式实现异步事务的有效控制。

🏆文章收录于:100天精通Java从入门到就业

全网最细Java零基础手把手入门教程,系列课程包括:Java基础、Java8新特性、Java集合、高并发、性能优化等,适合零基础和进阶提升的同学。

🏆哪吒多年工作总结:Java学习路线总结,搬砖工逆袭Java架构师

华为OD机试 2023B卷题库疯狂收录中,刷题点这里

刷的越多,抽中的概率越大,每一题都有详细的答题思路、详细的代码注释、样例测试,发现新题目,随时更新,全天CSDN在线答疑。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/444378.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

“福气满满过新年,爱心关怀暖人心”活动

为向困境人群送去节日温暖&#xff0c;传递政府对他们的关心、关注&#xff0c;同时送上新春祝福。2024年1月29日至30日&#xff0c;在范县镇政府的组织下、城关镇民政所的带领下&#xff0c;范县城关镇社工协助各县直单位、镇政府、村委会发放本辖区内新春物资&#xff0c;为辖…

【 USRP 相控阵】X波段相控阵开发平台用户指南

包装 一共三件。 1、AD9081-FMCA-EBZ AD9081 MxFE Evaluation Board, https://www.analog.com/eval-ad9081 AD9081 的全功能评估板使用 ACE 软件进行控制的 PC 软件HMC7044 的板载时钟用于管理套件和 FPGA 时钟选择切换到外部直接时钟 AD9081-FMCA-EBZ 评估板包括以各种模…

asp.net 404页面配置、 asp.net MVC 配置404页面、iis 配置404页面,指定404错误页面,设置404错误页面

通过标题的三个问题 1、asp.net 404页面配置、 2、asp.net MVC 配置404页面、 3、iis 配置404页面&#xff1b; 可以看出&#xff0c;这是一篇了不得的问题&#xff0c;并进行全面讲解&#xff1b; 除了围绕以上三个核心问题外&#xff0c;我们也对以下2个核心场景也作出分析…

JAVA可变参数

题目引出&#xff1a;在以前我们是这样做的&#xff1a;帮我们要求和的数据写在数组内即可 public class Test01 {public static void main(String[] args) {int []arr{1,2,3,4,5,6,7,8,9,10};int sum getSum(arr);System.out.println(sum);}public static int getSum( int […

Sketch 99.5中文 优秀的网站和移动应用设计软件

Sketch for mac用于数字世界的图形设计。在一个屡获殊荣的软件包中提供强大的工具和优雅的界面。因为做美丽的事情应该是一种快乐&#xff0c;而不是负担。 软件下载&#xff1a;Sketch 99.5中文激活版下载 Sketch支持每层多个填充&#xff0c;边框和阴影&#xff1b;具有强大的…

新春营销不间断,AI 整活更省心

新年、春节历来都是营销的大热节点&#xff0c;各种好物集、年货节、送礼清单比比皆是。这些新鲜玩法的背后是大量的品牌内容「弹药库」。 然而&#xff0c;品牌想在竞争激烈的新春季刷满存在感&#xff0c;并非易事。一方面&#xff0c;节日期间&#xff0c;消费者对于内容的审…

RT-Thread:STM32的PB3,PB4 复用IO配置为GPIO

说明&#xff1a;在使用 STM32F103CBT6 配置了 PB3 为IO&#xff0c;测试时发现读取这个IO的电平时钟是0&#xff0c;即便单管脚上的电平是1&#xff0c;读取的数据任然是0,查规格书后发现PB3,PB4是JTAG复用口&#xff0c;要当普通IO用需要配置。 配置工具&#xff1a;STM32Cu…

免费AI写作网站,AI人工智能写作gpt+在线AI绘画midjourney国内版

大家可以通过收藏网页www.woka.chat 直接进行访问&#xff0c;也可通过关注新公众号实现微信端使用~ 注册赠送大量额度&#xff0c;可用于网站全部功能&#xff08;问答和绘画&#xff09;&#xff01;每天签到也可领取充足使用额度&#xff01; 废话不多说&#xff0c;我们现…

成功解决AttributeError: ‘str‘ object has no attribute ‘keys‘

成功解决AttributeError: ‘str’ object has no attribute ‘keys’。 &#x1f335;文章目录&#x1f335; &#x1f333;引言&#x1f333;&#x1f333;报错分析及解决方案&#x1f333;&#x1f333;字典对象的keys方法&#x1f333;&#x1f333;结尾&#x1f333; &…

用的到的linux-文件移动-Day2

前言&#xff1a; 在上一节&#xff0c;我们复习了cd大法和创建生成文件和文件夹的方法&#xff0c;介绍了一些“偷懒”&#xff08;高效&#xff09;的小技巧&#xff0c;本节&#xff0c;我们一起来探讨下&#xff0c;我们对文件移动操作时有哪些可以偷懒的小技巧~ 一、复制…

Django知识随笔

目录 1.如何再ajax中传输post数据&#xff1f; 2.在form表单中使用jquery序列化&#xff0c;input框过多。 1.如何再ajax中传输post数据&#xff1f; 在ajax传递的那个网址&#xff0c;会调用你路由的视图函数&#xff0c;在视图函数上面加一句 csrf_exempt 。写上之后会有提…

企业股权结构API:为金融机构提供全面的企业背景调查服务

摘要 在当今快速变化的商业环境中&#xff0c;金融机构面临着日益复杂的风险管理挑战。为了做出明智的投资和信贷决策&#xff0c;深入了解企业的股权结构和实际控制人信息变得至关重要。企业股权结构API作为一种创新工具&#xff0c;为金融机构提供了一种高效、便捷的途径&am…