【探讨】批量操作以及多线程下保证事务的一致性

news/2024/11/29 22:37:14/文章来源:https://www.cnblogs.com/kukuxjx/p/18577623

1  前言

假如给你一个场景,有一批1万或者10万的数据,让你插入到数据库中怎么做呢?我们这节来看看。

首先一点我们单纯的 一个个 INSERT 语句,我们就不试了,这一个个的肯定慢,我们这里统一用 INSERT INTO 表(字段1,字段2) VALUES(值1,值2),(值11,值22),(值111,值222);这种方式分批跑高效点。

2  实践

2.1  循环批量在一个事务

首先我们看一个简单的,就是在一个事务里,一个线程循环执行:

@Transactional(rollbackFor = Exception.class)
public void batchSave() {// 分批// 至于分多少批:PgSQL 的占位符个数是有限制的 不能超过 Short.MAX(32767)// 所以一批最多 = 32767 / 你的一行字段个数// 比如我这里 = 32767 / 66个字段 = 496 也就是一批最多496个数据 我这里分450个List<List<OrderPo>> partition = Lists.partition(list, 450);StopWatch stopWatch = new StopWatch();stopWatch.start();// 顺序插入for (List<OrderPo> sub : partition) {orderMapper.batchSave(sub);}stopWatch.stop();log.info("耗时:" + stopWatch.getTotalTimeSeconds());
}

执行的效果:

// 1万条数据的耗时:
10000  6.9006843
10000  6.7756503
10000  5.7293283
10000  5.5923225
10000  5.818869    
10000  5.8303096
// 10万条数据的耗时:
100000 58.5615497
100000 58.0595869
100000 58.5441196
100000 58.3003101
100000 57.1142761
100000 54.5769813
100000 53.8146378

这种方式最大的优点就是简单、纯粹,中间有出错,事务回滚,最大的缺点也是比较明显就是慢。

2.2  利用线程池并行插入

为了加快查询,我们引入线程池插入,也就是分批后交给各个线程并行插入:

// 线程池
private static final ThreadPoolExecutor THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(8, 16, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1024), new ThreadFactory() {// 线程名字private final String PREFIX = "BATCH_INSERT_";// 计数器private AtomicLong atomicLong = new AtomicLong();@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(null, r, PREFIX + atomicLong.incrementAndGet());return thread;}
});
@SneakyThrows
@Transactional(rollbackFor = Exception.class)
public void batchSave() {// 分批// 至于分多少批:PgSQL 的占位符个数是有限制的 不能超过 Short.MAX(32767)// 所以一批最多 = 32767 / 你的一行字段个数// 比如我这里 = 32767 / 66个字段 = 496 也就是一批最多496个数据List<List<OrderPo>> partition = Lists.partition(list, 450);CountDownLatch countDownLatch = new CountDownLatch(partition.size());StopWatch stopWatch = new StopWatch();stopWatch.start();// 顺序插入for (List<OrderPo> sub : partition) {THREAD_POOL_EXECUTOR.execute(() -> {try {log.info("线程:{}开始处理", Thread.currentThread().getName());orderMapper.batchSave(sub);} finally {countDownLatch.countDown();}});}// 等待插入完毕
    countDownLatch.await();stopWatch.stop();log.info("耗时:" + stopWatch.getTotalTimeSeconds());
}

看下执行效果:

// 1万条数据的耗时:
10000  4.6711569
10000  4.4839416
10000  4.4310133
10000  4.3802914
10000  3.8440867   
10000  4.0849564
// 10万条数据的耗时:
100000 37.6524237
100000 35.1318877
100000 36.6338523
100000 36.4448236
100000 35.3499332
100000 36.0569744
100000 34.2736072

这种方式,优点是相对快了,但是缺点:事务下降到每个线程里了,可能会存在某个线程成功了,某个失败了,导致会存在数据丢,并且当并发比较高的时候,线程池队列满了呢?以及当前是阻塞的,await 会一致等,假如要加上等待时间,那等待时间设置多少呢?都是要考量的。

2.3  线程池并行插入但共用一个事务

可以将上边的多线程共用到一个事务里,也就是不再用声明式事务,我们可以用编程式事务,并且要让他们共用一个事务的话,其实说白了就是要共用一个数据库连接,可以参考我前的【Spring】【Mybatis】【事务】Spring + MyBaits + 事务 三者是如何协调的呢?(从一个数据库连接串一串 Spring、Mybatis、事务的联系)、【Spring】【Mybatis】【Dynamic-Datasource】【事务】Spring + MyBaits + 事务 + 动态数据源 四者是如何协调的呢?(从一个数据库连接串一串四者的联系),我这里实现方式如下:

// 线程池
private static final ThreadPoolExecutor THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(8, 16, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1024), new ThreadFactory() {// 线程名字private final String PREFIX = "BATCH_INSERT_";// 计数器private AtomicLong atomicLong = new AtomicLong();@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(null, r, PREFIX + atomicLong.incrementAndGet());return thread;}
});
@SneakyThrows
public void batchSave() {// 分批// 至于分多少批:PgSQL 的占位符个数是有限制的 不能超过 Short.MAX(32767)// 所以一批最多 = 32767 / 你的一行字段个数// 比如我这里 = 32767 / 66个字段 = 496 也就是一批最多496个数据List<List<OrderPo>> partition = Lists.partition(list, 450);// 手动事务提前创建出来DefaultTransactionDefinition transactionDefinition = new DefaultTransactionDefinition();transactionDefinition.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);// 提前获取连接TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);// 获取数据源以及连接 供多线程使用DataSource dataSource = dataSourceTransactionManager.getDataSource();Object resource = TransactionSynchronizationManager.getResource(dataSource);// 异常标志AtomicBoolean exceptionFlag = new AtomicBoolean(false);boolean poolExceptionFlag = false;// 计数器等待执行完毕CountDownLatch countDownLatch = new CountDownLatch(partition.size());StopWatch stopWatch = new StopWatch();stopWatch.start();// 顺序插入for (List<OrderPo> sub : partition) {try {THREAD_POOL_EXECUTOR.execute(() -> {try {// 如果没有发生异常if (exceptionFlag.get()) {log.info("有其他线程执行失败,后续无需执行,因为最终会回滚");return;}// 释放上次绑定的数据源连接try {TransactionSynchronizationManager.unbindResource(dataSource);} catch (Exception ignored){}// 装上本次使用的连接
                    TransactionSynchronizationManager.bindResource(dataSource, resource);log.info("线程:{}开始处理", Thread.currentThread().getName());// 执行插入
                    orderMapper.batchSave(sub);// 模拟异常if (ThreadLocalRandom.current().nextInt(3) == 1) {int i = 1/0;}} catch (Exception e) {// 发生异常设置异常标志log.error(String.format("线程:%s我发生了异常,e:%s", Thread.currentThread().getName(), e.getMessage()), e);exceptionFlag.set(true);} finally {// 不管是成功还是失败 都要计数器 -1
                    countDownLatch.countDown();}});} catch (Exception e) {// 提交任务失败 那就是失败了exceptionFlag.set(true);log.info("当前线程池繁忙,请稍后重试");dataSourceTransactionManager.rollback(transactionStatus);poolExceptionFlag = true;break;}}// 等待执行完毕  这里有个隐患  等待多长时间呢? 线程池任务过多的话最严重的情况 就是一直要在这里阻塞// 因为事务的提交还是回滚都交给了 主任务线程// 如果提交到线程池都成功了的话 就等待都执行完if (!poolExceptionFlag) {countDownLatch.await();}// 异常标志来做提交还是回滚if (exceptionFlag.get()) {// 发生异常 回滚
        dataSourceTransactionManager.rollback(transactionStatus);} else {// 未发生异常 可以提交
        dataSourceTransactionManager.commit(transactionStatus);}stopWatch.stop();log.info("耗时:" + stopWatch.getTotalTimeSeconds());
}

这种方式相对于上边一种,事务是共用到一个事务了,但是用到线程池以及队列满了如何呢?以及阻塞当前线程的问题。

2.4  批量任务表

那么基于这种批量操作,我们是不是可以建立两张表,思路如下:

至于如何异步执行每个明细,我们可以用 XXL-JOB定时去捞执行失败或者未执行的任务,如果任务数量比较多的话,捞出来通过发送 MQ 均摊的方式处理掉。

3  小结

大家要是有更好的思路胡总和有哪里理解不对的地方,还请指正哈。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/843716.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Windows mstsc 远程桌面链接 ubuntu 18.04 远程图形桌面

前言全局说明通常情况下,管理 Ubuntu 服务器都是用命令行界面,但某些时候,可能会用到图形界面。2204安装方法:https://www.cnblogs.com/wutou/p/18430133 命令行安装图形界面:https://www.cnblogs.com/wutou/p/18572907一、说明 环境: Ubuntu 18.04.6 LTS (Linux qt-vm 5…

[杂题]2024.9~2024.11 杂题总结

[杂题]2024.9~2024.11 杂题总结 题目做多了,不总结,和没做是一样的。 ARC061B 挺好的一道题。观察到三个不好做,我们想能否搞成一个牌堆去取。发现显然是可以的,我们只需要知道一个确定的取出来牌的编号序列,必然可以确定三者的牌堆分别是什么。 所以,问题转换成了:有多…

E. Photoshoot for Gorillas(Codeforces Round 966 (Div. 3))

https://codeforces.com/contest/2000/problem/E 题目描述 你非常喜欢屮大猩猩,于是你决定为它们组织一次拍摄活动。大猩猩生活在丛林中,丛林被表示为一个有 n 行 m 列的网格,有 w 个大猩猩同意参与拍摄,第 i 个大猩猩的身高ai .你希望将所有大猩猩放置在网格的单元格中,并…

windows版lammps的安装和计算

1.安装:详情见:https://mp.weixin.qq.com/s/xwx0c2ATNM0pphaHwDLkmQ 2.提交计算task的命令: mpiexec -np 4 lmp -in xxx.in #其中xxx.in表示in文件的名称,详情见 https://mp.weixin.qq.com/s/i6fa7xTKjlgSirPm0cj_4w 3.将MS的car和mdf文件导出data文件的方法:(命令…

第三十八讲:自增主键为什么不是连续的

你现在可以不懂,但以后面试的时候,必须要知道的三个关于自增主键的点 第一:唯一键冲突和事务回滚是导致自增主键不连续的两种大原因,此外批量插入数据的语句,MySQL 批量申请自增 id 的策略也是一个隐藏原因 第二:MySQL设计中不允许自增值回退的原因,主要是为了提升性能还…

NOIP 2024 退役记

人生有梦,各自精彩。Day -??? 摆疯了,啥也没复习,猫国建设者真好玩。 Day 0 昨晚回家结果摆到两点,感觉要在 noip 考场上睡着了/shui。早上没起来,迟到了/kk。 上午没怎么复习,摆摆摆,哎哎哎。 中午疯狂看小说,败犬太好看辣! 下午出发淄博。路上一直在睡觉,后悔没…

MySQL底层概述—4.InnoDB数据文件

大纲 1.表空间文件结构 (1)表空间Tablesapce (2)段Segment (3)区Extend (4)页Page (5)行Row 2.Page结构 (1)页结构各部分说明 (2)页结构整体划分 3.行记录格式 (1)行格式分类 (2)COMPACT行记录格式 (3)Compact中的行溢出机制 (4)其他行格式记录1.表空间文件结构 (1)表空间Table…

gin

Gin Gin入门 gin的学习要点如何定义路由:包括参数路由、通配符路由 如何处理输入输出 如何使用middleware解决AOP问题在 Gin 里面,用 Engine 来监听一个端口,是一个逻辑上的服务器。 一个 Go 进程可以创建多个 Engine。 hello, world 使用步骤:在应用中引入 Gin 依赖:go g…

MySQL底层概述—3.InnoDB线程模型

大纲 1.InnoDB的线程模型 2.IO Thread 3.Purge Thread 4.Page Cleaner Thread 5.Master Thread1.InnoDB的线程模型 InnoDB存储引擎是多线程的模型,因此其后台有多个不同的后台线程,负责处理不同的任务。后台线程的作用一:负责刷新内存池中的数据,保证缓冲池中的内存缓存是最…

20222427 2024-2025-1 《网络与系统攻防技术》实验七实验报告

1.实验内容 1.1 本周学习内容本周学习了有关Web安全的相关知识,复习了一些有关于Web的基础知识,比如:前、后端的定义,以及在前后端各自使用的语言,如:html、css、JS(前端);C/C++、Python、Java、Go、Php(后端)等。学习了有关于数据库攻击的一些基本操作,如:SQL注入…

基于Java+SpringBoot+Mysql实现的点卡各种卡寄售平台功能设计与实现四

部分功能:实名认证信息数据层Dao、银行卡类型信息数据层Dao、卡种类信息数据层Dao、卡类型信息数据层Dao、卡面值信息数据层Dao一、前言介绍: 免费学习:猿来入此 1.1 项目摘要 随着电子商务和在线支付技术的快速发展,数字商品和虚拟货币的交易需求日益增长。点卡及各种卡类…

kafka的搭建与使用

官网下载地址https://kafka.apache.org/downloads1、上传解压tar -zxvf kafka_2.11-1.0.0.tgz -C ../ mv kafka_2.11-1.0.0 kafka-1.0.02、修改环境变量 配置环境变量vim /etc/profileexport KAFKA_HOME=/usr/local/soft/kafka-1.0.0 export PATH=$PATH:$KAFKA_HOME/binsource …