支付宝:多线程事务怎么回滚?说用 @Transactional 可以回去等通知了!

1,最近有一个大数据量插入的操作入库的业务场景,需要先做一些其他修改操作,然后在执行插入操作,由于插入数据可能会很多,用到多线程去拆分数据并行处理来提高响应时间,如果有一个线程执行失败,则全部回滚。

2,在spring中可以使用@Transactional注解去控制事务,使出现异常时会进行回滚,在多线程中,这个注解则不会生效,如果主线程需要先执行一些修改数据库的操作,当子线程在进行处理出现异常时,主线程修改的数据则不会回滚,导致数据错误。

3,下面用一个简单示例演示多线程事务。

公用的类和方法

/*** 平均拆分list方法.* @param source* @param n* @param <T>* @return*/
public static <T> List<List<T>> averageAssign(List<T> source,int n){List<List<T>> result=new ArrayList<List<T>>();int remaider=source.size()%n; int number=source.size()/n; int offset=0;//偏移量for(int i=0;i<n;i++){List<T> value=null;if(remaider>0){value=source.subList(i*number+offset, (i+1)*number+offset+1);remaider--;offset++;}else{value=source.subList(i*number+offset, (i+1)*number+offset);}result.add(value);}return result;
}
/**  线程池配置* @version V1.0*/
public class ExecutorConfig {private static int maxPoolSize = Runtime.getRuntime().availableProcessors();private volatile static ExecutorService executorService;public static ExecutorService getThreadPool() {if (executorService == null){synchronized (ExecutorConfig.class){if (executorService == null){executorService =  newThreadPool();}}}return executorService;}private static  ExecutorService newThreadPool(){int queueSize = 500;int corePool = Math.min(5, maxPoolSize);return new ThreadPoolExecutor(corePool, maxPoolSize, 10000L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(queueSize),new ThreadPoolExecutor.AbortPolicy());}private ExecutorConfig(){}
}
/** 获取sqlSession* @author 86182* @version V1.0*/
@Component
public class SqlContext {@Resourceprivate SqlSessionTemplate sqlSessionTemplate;public SqlSession getSqlSession(){SqlSessionFactory sqlSessionFactory = sqlSessionTemplate.getSqlSessionFactory();return sqlSessionFactory.openSession();}
}

示例事务不成功操作

  /*** 测试多线程事务.* @param employeeDOList*/
@Override
@Transactional
public void saveThread(List<EmployeeDO> employeeDOList) {try {//先做删除操作,如果子线程出现异常,此操作不会回滚this.getBaseMapper().delete(null);//获取线程池ExecutorService service = ExecutorConfig.getThreadPool();//拆分数据,拆分5份List<List<EmployeeDO>> lists=averageAssign(employeeDOList, 5);//执行的线程Thread []threadArray = new Thread[lists.size()];//监控子线程执行完毕,再执行主线程,要不然会导致主线程关闭,子线程也会随着关闭CountDownLatch countDownLatch = new CountDownLatch(lists.size());AtomicBoolean atomicBoolean = new AtomicBoolean(true);for (int i =0;i<lists.size();i++){if (i==lists.size()-1){atomicBoolean.set(false);}List<EmployeeDO> list  = lists.get(i);threadArray[i] =  new Thread(() -> {try {//最后一个线程抛出异常if (!atomicBoolean.get()){throw new ServiceException("001","出现异常");}//批量添加,mybatisPlus中自带的batch方法this.saveBatch(list);}finally {countDownLatch.countDown();}});}for (int i = 0; i <lists.size(); i++){service.execute(threadArray[i]);}//当子线程执行完毕时,主线程再往下执行countDownLatch.await();System.out.println("添加完毕");}catch (Exception e){log.info("error",e);throw new ServiceException("002","出现异常");}finally {connection.close();}
}

数据库中存在一条数据:

图片

图片

//测试用例
@RunWith(SpringRunner.class)
@SpringBootTest(classes = { ThreadTest01.class, MainApplication.class})
public class ThreadTest01 {@Resourceprivate EmployeeBO employeeBO;/***   测试多线程事务.* @throws InterruptedException*/@Testpublic  void MoreThreadTest2() throws InterruptedException {int size = 10;List<EmployeeDO> employeeDOList = new ArrayList<>(size);for (int i = 0; i<size;i++){EmployeeDO employeeDO = new EmployeeDO();employeeDO.setEmployeeName("lol"+i);employeeDO.setAge(18);employeeDO.setGender(1);employeeDO.setIdNumber(i+"XX");employeeDO.setCreatTime(Calendar.getInstance().getTime());employeeDOList.add(employeeDO);}try {employeeBO.saveThread(employeeDOList);System.out.println("添加成功");}catch (Exception e){e.printStackTrace();}}
}

测试结果:

图片

图片

图片

图片

可以发现子线程组执行时,有一个线程执行失败,其他线程也会抛出异常,但是主线程中执行的删除操作,没有回滚,@Transactional注解没有生效。

使用sqlSession控制手动提交事务

 @ResourceSqlContext sqlContext;/*** 测试多线程事务.* @param employeeDOList*/
@Override
public void saveThread(List<EmployeeDO> employeeDOList) throws SQLException {// 获取数据库连接,获取会话(内部自有事务)SqlSession sqlSession = sqlContext.getSqlSession();Connection connection = sqlSession.getConnection();try {// 设置手动提交connection.setAutoCommit(false);//获取mapperEmployeeMapper employeeMapper = sqlSession.getMapper(EmployeeMapper.class);//先做删除操作employeeMapper.delete(null);//获取执行器ExecutorService service = ExecutorConfig.getThreadPool();List<Callable<Integer>> callableList  = new ArrayList<>();//拆分listList<List<EmployeeDO>> lists=averageAssign(employeeDOList, 5);AtomicBoolean atomicBoolean = new AtomicBoolean(true);for (int i =0;i<lists.size();i++){if (i==lists.size()-1){atomicBoolean.set(false);}List<EmployeeDO> list  = lists.get(i);//使用返回结果的callable去执行,Callable<Integer> callable = () -> {//让最后一个线程抛出异常if (!atomicBoolean.get()){throw new ServiceException("001","出现异常");}return employeeMapper.saveBatch(list);};callableList.add(callable);}//执行子线程List<Future<Integer>> futures = service.invokeAll(callableList);for (Future<Integer> future:futures) {//如果有一个执行不成功,则全部回滚if (future.get()<=0){connection.rollback();return;}}connection.commit();System.out.println("添加完毕");}catch (Exception e){connection.rollback();log.info("error",e);throw new ServiceException("002","出现异常");}finally {connection.close();}
}
// sql
<insert id="saveBatch" parameterType="List">INSERT INTOemployee (employee_id,age,employee_name,birth_date,gender,id_number,creat_time,update_time,status)values<foreach collection="list" item="item" index="index" separator=",">(#{item.employeeId},#{item.age},#{item.employeeName},#{item.birthDate},#{item.gender},#{item.idNumber},#{item.creatTime},#{item.updateTime},#{item.status})</foreach></insert>

数据库中一条数据:

图片

图片

测试结果:抛出异常,

图片

图片

删除操作的数据回滚了,数据库中的数据依旧存在,说明事务成功了。

图片

图片

成功操作示例:

 @Resource
SqlContext sqlContext;
/*** 测试多线程事务.* @param employeeDOList*/
@Override
public void saveThread(List<EmployeeDO> employeeDOList) throws SQLException {// 获取数据库连接,获取会话(内部自有事务)SqlSession sqlSession = sqlContext.getSqlSession();Connection connection = sqlSession.getConnection();try {// 设置手动提交connection.setAutoCommit(false);EmployeeMapper employeeMapper = sqlSession.getMapper(EmployeeMapper.class);//先做删除操作employeeMapper.delete(null);ExecutorService service = ExecutorConfig.getThreadPool();List<Callable<Integer>> callableList  = new ArrayList<>();List<List<EmployeeDO>> lists=averageAssign(employeeDOList, 5);for (int i =0;i<lists.size();i++){List<EmployeeDO> list  = lists.get(i);Callable<Integer> callable = () -> employeeMapper.saveBatch(list);callableList.add(callable);}//执行子线程List<Future<Integer>> futures = service.invokeAll(callableList);for (Future<Integer> future:futures) {if (future.get()<=0){connection.rollback();return;}}connection.commit();System.out.println("添加完毕");}catch (Exception e){connection.rollback();log.info("error",e);throw new ServiceException("002","出现异常");// throw new ServiceException(ExceptionCodeEnum.EMPLOYEE_SAVE_OR_UPDATE_ERROR);}
}

测试结果:

图片

图片

数据库中数据:

删除的删除了,添加的添加成功了,测试成功。

图片

最后说一句(求关注!别白嫖!)

如果这篇文章对您有所帮助,或者有所启发的话,求一键三连:点赞、转发、在看。

关注公众号:woniuxgg,在公众号中回复:笔记  就可以获得蜗牛为你精心准备的java实战语雀笔记,回复面试、开发手册、有超赞的粉丝福利!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/422910.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2011-2022年全国各地级市互联网普及率/互联网宽带接入用户数数据

2011-2022年全国各地级市互联网宽带接入用户数/互联网普及率数据 1、时间&#xff1a;2011-2022年 2、范围&#xff1a;包括295个地级市 3、指标&#xff1a;行政区划代码、年份、地区、互联网宽带接入用户_千户、常住人口数_千人、户籍人口数_千人、每百人互联网宽带用户_常…

续签KES证书

MiniO KES&#xff08;密钥加密服务&#xff09;是 MinIO 开发的一项服务&#xff0c;旨在弥合在 Kubernetes 中运行的应用程序与集中式密钥管理服务 &#xff08;KMS&#xff09; 之间的差距。中央 KMS 服务器包含所有状态信息&#xff0c;而 KES 在需要执行与获取新密钥或更新…

docker运行redis,jdk,nginx

Redis 1.查询redis [rootlocalhost ~]# docker search redis NAME DESCRIPTION STARS OFFICIAL redis Redis is an open source key-value store that… 12620 …

网络安全全栈培训笔记(WEB攻防-51-WEB攻防-通用漏洞验证码识别复用调用找回密码重定向状态值)

第51天 WEB攻防-通用漏洞&验证码识别&复用&调用&找回密码重定向&状态值 知识点&#xff1a; 1、找回密码逻辑机制-回显&验证码&指向 2、验证码验证安全机制-爆破&复用&识别 3、找回密码客户端回显&Response状态值&修改重定向 4、…

革新区块链:代理合约与智能合约升级的未来

作者 张群&#xff08;赛联区块链教育首席讲师&#xff0c;工信部赛迪特聘资深专家&#xff0c;CSDN认证业界专家&#xff0c;微软认证专家&#xff0c;多家企业区块链产品顾问&#xff09;关注张群&#xff0c;为您提供一站式区块链技术和方案咨询。 代理合约&#xff08;Prox…

地方债务余额数据,Shp、excel格式,2008-2020年,含公共财政收入、支出、负债率等多个字段

基本信息&#xff1a; 数据名称: 地方债务余额数据 数据格式: Shp、excel 数据时间: 2008-2020年 数据几何类型: 面 数据坐标系: WGS84 数据来源&#xff1a;网络公开数据 数据字段&#xff1a; 序号字段名称字段说明1zfzqsl地方政府债-债券数量(只)2zfzqye地方政府…

顶顶通呼叫中心中间件如何实现自己呼叫自己并且放音:一步步配置(mod_cti基于FreeSWITCH)

介绍 顶顶通呼叫中心中间件如何实现自己呼叫自己并且放音&#xff1a;一步步配置 一、配置acl.conf 打开ccadmin-》点击配置文件并且打开acl.conf-》配置好了点击提交XML。 注意&#xff1a;acl.conf的服务器IP必须是内网IP 添加了之后在运维调试输入reloadacl 在运维调试执…

w23靶场安装

一、实验环境 服务器&#xff1a;phpstudyv8.1.13 靶场&#xff1a;Bees二、实验目的 提供一个靶场环境 三、实验步骤 bees靶场安装 1.启动小皮的apache和mysql 2.在小皮V8.1.1.3版本上创建bees网站&#xff0c;选择的php版本最好在5.x&#xff0c;不然会有php解析错误。…

739.每日温度 496.下一个更大元素 I

739.每日温度 496.下一个更大元素 I 739.每日温度 力扣题目链接(opens new window) 请根据每日 气温 列表&#xff0c;重新生成一个列表。对应位置的输出为&#xff1a;要想观测到更高的气温&#xff0c;至少需要等待的天数。如果气温在这之后都不会升高&#xff0c;请在该位…

【Android】TypedArray的使用

介绍 看电池电量组件BatteryMeterView的时候看到的。 Array是个数组&#xff0c;所有TypedArray也是个容器&#xff0c;基本是用于自定义View里面的&#xff08;至少我目前见过的全部都在自定义View里面&#xff09;。 使用 1.自定义View public class RoundSeekbarView e…

陪玩系统:最新商业版游戏陪玩语音聊天系统3.0商业升级独立版本源码

首发价值29800元的最新商业版游戏陪玩语音聊天系统3.0商业升级独立版本源码 &#xff08;价值29800&#xff09;最新陪玩3.0独立版本 &#xff0c;文件截图 结尾将会附上此系统源码以及详细搭建教程包含素材图仅用于学习使用 陪玩系统3.0独立升级版正式发布&#xff0c;此版本…

计算机视觉的应用

计算机视觉&#xff08;Computer Vision&#xff09;是一门研究如何让计算机能够理解和分析数字图像或视频的学科。简单来说&#xff0c;计算机视觉的目标是让计算机能够像人类一样对视觉信息进行处理和理解。为实现这个目标&#xff0c;计算机视觉结合了图像处理、机器学习、模…