1 前言
假如给你一个场景,有一批1万或者10万的数据,让你插入到数据库中怎么做呢?我们这节来看看。
首先一点我们单纯的 一个个 INSERT 语句,我们就不试了,这一个个的肯定慢,我们这里统一用 INSERT INTO 表(字段1,字段2) VALUES(值1,值2),(值11,值22),(值111,值222);这种方式分批跑高效点。
2 实践
2.1 循环批量在一个事务
首先我们看一个简单的,就是在一个事务里,一个线程循环执行:
@Transactional(rollbackFor = Exception.class) public void batchSave() {// 分批// 至于分多少批:PgSQL 的占位符个数是有限制的 不能超过 Short.MAX(32767)// 所以一批最多 = 32767 / 你的一行字段个数// 比如我这里 = 32767 / 66个字段 = 496 也就是一批最多496个数据 我这里分450个List<List<OrderPo>> partition = Lists.partition(list, 450);StopWatch stopWatch = new StopWatch();stopWatch.start();// 顺序插入for (List<OrderPo> sub : partition) {orderMapper.batchSave(sub);}stopWatch.stop();log.info("耗时:" + stopWatch.getTotalTimeSeconds()); }
执行的效果:
// 1万条数据的耗时: 10000 6.9006843 10000 6.7756503 10000 5.7293283 10000 5.5923225 10000 5.818869 10000 5.8303096 // 10万条数据的耗时: 100000 58.5615497 100000 58.0595869 100000 58.5441196 100000 58.3003101 100000 57.1142761 100000 54.5769813 100000 53.8146378
这种方式最大的优点就是简单、纯粹,中间有出错,事务回滚,最大的缺点也是比较明显就是慢。
2.2 利用线程池并行插入
为了加快查询,我们引入线程池插入,也就是分批后交给各个线程并行插入:
// 线程池 private static final ThreadPoolExecutor THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(8, 16, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1024), new ThreadFactory() {// 线程名字private final String PREFIX = "BATCH_INSERT_";// 计数器private AtomicLong atomicLong = new AtomicLong();@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(null, r, PREFIX + atomicLong.incrementAndGet());return thread;} }); @SneakyThrows @Transactional(rollbackFor = Exception.class) public void batchSave() {// 分批// 至于分多少批:PgSQL 的占位符个数是有限制的 不能超过 Short.MAX(32767)// 所以一批最多 = 32767 / 你的一行字段个数// 比如我这里 = 32767 / 66个字段 = 496 也就是一批最多496个数据List<List<OrderPo>> partition = Lists.partition(list, 450);CountDownLatch countDownLatch = new CountDownLatch(partition.size());StopWatch stopWatch = new StopWatch();stopWatch.start();// 顺序插入for (List<OrderPo> sub : partition) {THREAD_POOL_EXECUTOR.execute(() -> {try {log.info("线程:{}开始处理", Thread.currentThread().getName());orderMapper.batchSave(sub);} finally {countDownLatch.countDown();}});}// 等待插入完毕 countDownLatch.await();stopWatch.stop();log.info("耗时:" + stopWatch.getTotalTimeSeconds()); }
看下执行效果:
// 1万条数据的耗时: 10000 4.6711569 10000 4.4839416 10000 4.4310133 10000 4.3802914 10000 3.8440867 10000 4.0849564 // 10万条数据的耗时: 100000 37.6524237 100000 35.1318877 100000 36.6338523 100000 36.4448236 100000 35.3499332 100000 36.0569744 100000 34.2736072
这种方式,优点是相对快了,但是缺点:事务下降到每个线程里了,可能会存在某个线程成功了,某个失败了,导致会存在数据丢,并且当并发比较高的时候,线程池队列满了呢?以及当前是阻塞的,await 会一致等,假如要加上等待时间,那等待时间设置多少呢?都是要考量的。
2.3 线程池并行插入但共用一个事务
可以将上边的多线程共用到一个事务里,也就是不再用声明式事务,我们可以用编程式事务,并且要让他们共用一个事务的话,其实说白了就是要共用一个数据库连接,可以参考我前的【Spring】【Mybatis】【事务】Spring + MyBaits + 事务 三者是如何协调的呢?(从一个数据库连接串一串 Spring、Mybatis、事务的联系)、【Spring】【Mybatis】【Dynamic-Datasource】【事务】Spring + MyBaits + 事务 + 动态数据源 四者是如何协调的呢?(从一个数据库连接串一串四者的联系),我这里实现方式如下:
// 线程池 private static final ThreadPoolExecutor THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(8, 16, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1024), new ThreadFactory() {// 线程名字private final String PREFIX = "BATCH_INSERT_";// 计数器private AtomicLong atomicLong = new AtomicLong();@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(null, r, PREFIX + atomicLong.incrementAndGet());return thread;} }); @SneakyThrows public void batchSave() {// 分批// 至于分多少批:PgSQL 的占位符个数是有限制的 不能超过 Short.MAX(32767)// 所以一批最多 = 32767 / 你的一行字段个数// 比如我这里 = 32767 / 66个字段 = 496 也就是一批最多496个数据List<List<OrderPo>> partition = Lists.partition(list, 450);// 手动事务提前创建出来DefaultTransactionDefinition transactionDefinition = new DefaultTransactionDefinition();transactionDefinition.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);// 提前获取连接TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);// 获取数据源以及连接 供多线程使用DataSource dataSource = dataSourceTransactionManager.getDataSource();Object resource = TransactionSynchronizationManager.getResource(dataSource);// 异常标志AtomicBoolean exceptionFlag = new AtomicBoolean(false);boolean poolExceptionFlag = false;// 计数器等待执行完毕CountDownLatch countDownLatch = new CountDownLatch(partition.size());StopWatch stopWatch = new StopWatch();stopWatch.start();// 顺序插入for (List<OrderPo> sub : partition) {try {THREAD_POOL_EXECUTOR.execute(() -> {try {// 如果没有发生异常if (exceptionFlag.get()) {log.info("有其他线程执行失败,后续无需执行,因为最终会回滚");return;}// 释放上次绑定的数据源连接try {TransactionSynchronizationManager.unbindResource(dataSource);} catch (Exception ignored){}// 装上本次使用的连接 TransactionSynchronizationManager.bindResource(dataSource, resource);log.info("线程:{}开始处理", Thread.currentThread().getName());// 执行插入 orderMapper.batchSave(sub);// 模拟异常if (ThreadLocalRandom.current().nextInt(3) == 1) {int i = 1/0;}} catch (Exception e) {// 发生异常设置异常标志log.error(String.format("线程:%s我发生了异常,e:%s", Thread.currentThread().getName(), e.getMessage()), e);exceptionFlag.set(true);} finally {// 不管是成功还是失败 都要计数器 -1 countDownLatch.countDown();}});} catch (Exception e) {// 提交任务失败 那就是失败了exceptionFlag.set(true);log.info("当前线程池繁忙,请稍后重试");dataSourceTransactionManager.rollback(transactionStatus);poolExceptionFlag = true;break;}}// 等待执行完毕 这里有个隐患 等待多长时间呢? 线程池任务过多的话最严重的情况 就是一直要在这里阻塞// 因为事务的提交还是回滚都交给了 主任务线程// 如果提交到线程池都成功了的话 就等待都执行完if (!poolExceptionFlag) {countDownLatch.await();}// 异常标志来做提交还是回滚if (exceptionFlag.get()) {// 发生异常 回滚 dataSourceTransactionManager.rollback(transactionStatus);} else {// 未发生异常 可以提交 dataSourceTransactionManager.commit(transactionStatus);}stopWatch.stop();log.info("耗时:" + stopWatch.getTotalTimeSeconds()); }
这种方式相对于上边一种,事务是共用到一个事务了,但是用到线程池以及队列满了如何呢?以及阻塞当前线程的问题。
2.4 批量任务表
那么基于这种批量操作,我们是不是可以建立两张表,思路如下:
至于如何异步执行每个明细,我们可以用 XXL-JOB定时去捞执行失败或者未执行的任务,如果任务数量比较多的话,捞出来通过发送 MQ 均摊的方式处理掉。
3 小结
大家要是有更好的思路胡总和有哪里理解不对的地方,还请指正哈。