分页多线程处理大批量数据

1.业务场景

因为需要从一个返利明细表中获取大量的数据,生成返利报告,耗时相对较久,作为后台任务执行。但是后台任务如果不用多线程处理,也会要很长时间才能处理完。

另外考虑到数据量大,不能一次查询所有数据在内存中处理,为了防止内存溢出,分页查询数据,然后分批次多线程处理。

主要思想是采取分治的思想,首先分页查询数据,然后每页数据分成均匀的不同片段,多个线程处理这些片段,一个线程处理一个片段,可以加上等待的同步计数器,让这一页数据全部处理完后再去查询下一页的数据。

2.关键代码

//线程池配置
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(10,10,10L,TimeUnit.SECONDS,new LinkedBlockingQueue<>(200), new ThreadPoolExecutor.CallerRunsPolicy());public String generateReport(String periodType, String monthWid, String quarterWid) {int totalNum = 0;//计时器StopWatch stopWatch = new StopWatch();stopWatch.start();try {//这里省略了一些其他的逻辑,只关注分页查询然后多线程任务处理的逻辑......//查询总数量totalNum = getReportTotalNum(periodType, monthWid, quarterWid, totalNum);int pageIndex = 0;int pageSize = 500;int pageNum = 1;StoreRebateDetailForReportQueryReq req = null;while (pageNum <= (totalNum % pageSize == 0 ? (totalNum / pageSize) : (totalNum / pageSize + 1))) {//分页查询,每页500条数据pageIndex = pageSize * (pageNum - 1);List<StoreRebateDetail> list = storeRebateDetailService.selectListForRebateReport(pageIndex, pageSize);int batchNum = list.size();//每个线程处理100条                                                                                       int perThreadCount = 100;LOGGER.info("开始处理第{}页(共{}条)数据", pageNum, batchNum);final CountDownLatch cdl = new CountDownLatch((batchNum % perThreadCount) == 0 ? (batchNum / perThreadCount) : (batchNum / perThreadCount + 1)); //计数器for (int j = 0; j < batchNum; j++) {//每100条一个线程处理if (j % perThreadCount == 0) {int start = j;int end = (batchNum - j) >= perThreadCount ? (j + perThreadCount) : batchNum;int pageNums = pageNum;poolExecutor.submit(()->{LOGGER.info("第{}页的第{}-{}条数据处理开始", pageNums, start+1, end);//处理比较复杂的业务逻辑(耗时较久)processInsert(list, start, end);LOGGER.info("第{}页的第{}-{}条数据处理结束", pageNums, start+1, end);cdl.countDown();});}}cdl.await();pageNum++;}stopWatch.stop();double totalTimeSeconds = stopWatch.getTotalTimeSeconds();result.put("syncStatus", "success");result.put("syncMsg", "调度处理完毕,生成" + totalNum + "条数据,执行时间为" + totalTimeSeconds + "秒");return SToolUtils.convertResultJSONObj(CommonAbstractService.SUCCESS_STATUS, "处理成功", totalNum, new JSONArray().fluentAdd(result)).toString();} catch (Exception e) {stopWatch.stop();double totalTimeSeconds = stopWatch.getTotalTimeSeconds();LOGGER.error("调度处理异常:{}--{}", e.getMessage(), e);result.put("syncStatus", "fail");result.put("syncMsg", "调度处理完毕,生成" + totalNum + "条数据,执行时间为" + totalTimeSeconds + "秒");return SToolUtils.convertResultJSONObj(CommonAbstractService.ERROR_STATUS, "处理异常", 0, new JSONArray().fluentAdd(result)).toString();} finally {//做业务需要处理的,可以没有}}

后面改了个通用版,采用接口中的默认方法实现主要公共逻辑,其他几个需要不同实现的方法让子类去实现。

batchProcess方法为主要处理逻辑入口方法,供其子类继承,子类需要传递线程池、每页大小、每个线程处理的条数、查询数据的参数等参数。

processLongTimeLogic方法为处理时间比较长,需要多线程去执行的逻辑,子类直接覆写这个方法,将复杂的耗时比较长的业务逻辑放在里面就可以了。

queryTotalNum方法为查询总记录数的方法,子类去具体实现查询逻辑,查询数量是为了后续分页处理。

queryDataListByPage方法为分页查询数据的方法,也是子类去实现具体的逻辑,这里的第一个参数list加了泛型处理,<T>为查询数据返回的实体对象类,这样在后续处理的时候就不要去强转类型了。

这样子类只需要关注查询大表的查询逻辑,以及需要处理的具体业务逻辑,而不需要去处理分页和多线程处理的逻辑,这样增加了代码的可读性以及减少了出错的可能性。

public interface BatchProcessService<T> {/*** 批量处理,分页+多线程处理* @param poolExecutor       线程池* @param pageSize           每页查询的大小* @param perThreadCount     每个线程处理的记录数* @param queryTotalNumParam 查询记录总数的参数,必须继承PageReq* @param queryDataParam     查询分页列表的参数,必须继承PageReq* @param logger             子类的日志对象* @param otherParam         其他参数,需要给processLongTimeLogic方法传递的参数* @throws InterruptedException*/default int batchProcess(ThreadPoolExecutor poolExecutor, int pageSize, int perThreadCount, Object queryTotalNumParam, PageReq queryDataParam, Logger logger, Map<String, Object> otherParam) throws InterruptedException {int pageIndex = 0;int pageNum = 1;int totalNum = queryTotalNum(queryTotalNumParam);if (totalNum == 0) {logger.info("需要处理的数据数量为0");return 0;}try {while (pageNum <= (totalNum % pageSize == 0 ? (totalNum / pageSize) : (totalNum / pageSize + 1))) {pageIndex = pageSize * (pageNum - 1);queryDataParam.setPageIndex(pageIndex);queryDataParam.setPageRows(pageSize);List<T> list = queryDataListByPage(queryDataParam);int batchNum = list.size();final CountDownLatch cdl = new CountDownLatch((batchNum % perThreadCount) == 0 ? (batchNum / perThreadCount) : (batchNum / perThreadCount + 1)); //计数器for (int j = 1; j <= (batchNum % perThreadCount == 0 ? (batchNum / perThreadCount) : (batchNum / perThreadCount + 1)); j++) {//每100条一个线程处理int start = perThreadCount * (j - 1);int end = (batchNum - start) >= perThreadCount ? (start + perThreadCount) : batchNum;int pageNums = pageNum;poolExecutor.submit(() -> {logger.info("第{}页的第{}-{}条数据处理开始", pageNums, start + 1, end);//处理其他长时间的逻辑processLongTimeLogic(list.subList(start, end), otherParam);logger.info("第{}页的第{}-{}条数据处理结束", pageNums, start + 1, end);cdl.countDown();});}cdl.await();pageNum++;}} catch (Exception e) {logger.error("批量处理数据异常", e);throw e;}return totalNum;}/*** 查询记录总数** @param queryParam* @return*/int queryTotalNum(Object queryParam);/*** 分页查询数据** @param queryDataParam* @return*/List<T> queryDataListByPage(PageReq queryDataParam);/*** 处理长时间业务逻辑** @param list  处理的数据列表* @param otherParam 其他参数*/void processLongTimeLogic(List<T> list, Map<String, Object> otherParam);
}

PageReq类为分页查询参数的父类,里面包含了分页的一些属性,查询参数的实体继承该类就可以了,其他是自己的业务相关的参数。

import lombok.Getter;
import lombok.Setter;import java.io.Serializable;@Getter
@Setter
public class PageReq implements Serializable {/*** 当前页码*/private Integer pageIndex = 1;/*** 页大小*/private Integer pageRows = 10;public PageReq() {}public PageReq(Integer pageIndex, Integer pageRows) {this.pageIndex = pageIndex;this.pageRows = pageRows;}}

3.测试效果

原来跑一个月的数据需要40多分钟,后面通过这样处理后,采用5个线程跑,时间缩短至8分钟左右,相当于差不多时间缩短到原来的1/5。

image-20240320124945462

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/563156.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C# WPF编程-事件

C# WPF编程-路由事件 路由事件概要路由事件的三种方式 WPF事件WPF最重要的5类事件&#xff1a;生命周期事件 鼠标事件键盘事件多点触控输入原始触控 路由事件概要 路由事件是具有更强传播能力的事件&#xff0c;它们可在元素树中向上冒泡和向下隧道传播&#xff0c;并沿着传播…

Pink老师Echarts教学笔记

可视化面板介绍 ​ 应对现在数据可视化的趋势&#xff0c;越来越多企业需要在很多场景(营销数据&#xff0c;生产数据&#xff0c;用户数据)下使用&#xff0c;可视化图表来展示体现数据&#xff0c;让数据更加直观&#xff0c;数据特点更加突出。 01-使用技术 完成该项目需…

AIGC——ComfyUI使用SDXL双模型的工作流(附件SDXL模型下载)

SDXL算法概述 SDXL&#xff08;Stable Diffusion XL&#xff09;是Stable Diffusion公司发布的一款图像生成大模型。在以往的模型基础上&#xff0c;SDXL进行了极大的升级&#xff0c;其base模型参数数量达到了35亿&#xff0c;refiner模型参数数量达到了66亿。SDXL与之前的版…

STM32---DHT11温湿度传感器与BH1750FVI光照传感器(HAL库、含源码)

写在前面&#xff1a;本节我们学习使用两个常见的传感器模块&#xff0c;分别为DHT11温湿度传感器以及BH1750FVI光照传感器,这两种传感器在对于环境监测中具有十分重要的作用&#xff0c;因为其使用简单方便&#xff0c;所以经常被用于STM32的项目之中。今天将使用分享给大家&a…

NO9 蓝桥杯单片机实践之串口通信的使用

1 回顾 串口通信的代码编写结构还是与中断一样&#xff0c;不同的是&#xff1a; 初始中断函数条件涉及到串口通信相关的寄存器和定时器1相关的寄存器&#xff08;定时器1用于产生波特率&#xff09;&#xff0c;但初始条件中的中断寄存器只考虑串口通信而不考虑定时器1。 vo…

基于华为ensp的企业网络规划(新版)

基于华为ensp的企业网络规划&#xff08;新版&#xff09; 第一章 项目概述1.1 项目总体描述1.2 项目总体功能要求 第二章 可行性分析2.1 经济效益分析2.2 项目分析2.3 技术可行性分析2.4 项目风险分析 第三章 需求分析3.1 总体需求3.2 具体需求3.3 非功能需求 第四章 总体设计…

Day44:WEB攻防-PHP应用SQL盲注布尔回显延时判断报错处理增删改查方式

目录 PHP-MYSQL-SQL操作-增删改查 PHP-MYSQL-注入函数-布尔&报错&延迟 基于布尔的SQL盲注-逻辑判断(需要有回显,没回显搞不了)跟union需要的条件差不多 基于时间的SQL盲注-延时判断(不需要任何回显) 基于报错的SQL盲注-报错回显(需要报错回显&#xff0c;没报错回…

保研复习概率论1

1.什么是随机试验&#xff08;random trial&#xff09;&#xff1f; 如果一个试验满足试验可以在相同的条件下重复进行、试验所有可能结果明确可知&#xff08;或者是可知这个范围&#xff09;、每一次试验前会出现哪个结果事先并不确定&#xff0c;那么试验称为随机试验。 …

FileZilla 链接服务器提示 20 秒连接超时

FileZilla 有个默认设置是如果 20 秒没有数据的话会自动中断链接。 Command: Pass: **************** Error: Connection timed out after 20 seconds of inactivity Error: Could not connect to server修改配置 这个配置是可以修改的&#xff0c;修改的步骤为&#xff1a; …

C# Solidworks二次开发:获取主窗口API和创建新活动窗口API详解

今天要讲的是Solidworks中的两个API。 &#xff08;1&#xff09;Frame Method (ISldWorks)&#xff1a;获取SOLIDWORKS主框架。 下面是API中给出的例子&#xff1a; public void Main(){ModelDoc2 swModelDoc default(ModelDoc2);Frame swFrame default(Frame);ModelWindow…

网络安全实训Day8

写在前面 网络工程终于讲完了。这星期到了网络安全技术部分。 网络安全实训-网络安全技术 网络安全概述 信息安全&#xff1a;所有保障计算机硬件、系统、软件、数据不因有意或无意的行为导致的服务中断、数据损坏或丢失等安全事件的保障技术 网络安全&#xff1a;基于计算机…

解决淘宝镜像过期问题 ERR! request https://registry.npm.taobao.org

目录 一、问题描述 二、解决方案 2.1、针对于域名更换解决方案 2.2、针对于证书过期解决方案 三、进行测试 一、问题描述 针对于2022年5月31号和2024年1 月 22 日前的前端项目 npm.taobao.org和旧域名于2021年官方公告域名更换事件&#xff0c;已于2022年05月31日零时起…