SpringBoot整合ShardingJdbc分表

news/2024/9/19 18:15:28/文章来源:https://www.cnblogs.com/wlong-blog/p/18421103

项目中处理接收设备上报日志需求,上报数据量大,因此对数据进行按日期分表处理。

引入所需jar :

 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><dependency><groupId>com.alibaba.fastjson2</groupId><artifactId>fastjson2</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-jpa</artifactId></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.23</version></dependency><!--shardingJDBC--><dependency><groupId>org.apache.shardingsphere</groupId><artifactId>sharding-jdbc-spring-boot-starter</artifactId><version>4.1.1</version></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><scope>test</scope></dependency>

在application.yml 中配置数据库分表:

spring:application:name: data-systemprofiles:active: local# 关闭驼峰命名jpa:hibernate:naming:physical-strategy: org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl# sharding jdbc配置shardingsphere:datasource:names: ds0ds0:type: com.zaxxer.hikari.HikariDataSourcedriver-class-name: com.mysql.cj.jdbc.Driverjdbc-url: jdbc:mysql:username: password: # 配置表的分布,表的策略sharding:tables:ali_data:actual-data-nodes: ds0.ali_datakey-generator:# 指定表 主键id 生成策略为 SNOWFLAKEcolumn: idtype: SNOWFLAKEtable-strategy:standard:# 分片字段sharding-column: create_time# 精确算法实现类路径precise-algorithm-class-name: com.chunmi.data.group.shardingjdbc.PreciseAlgorithmCustomerdata_source:actual-data-nodes: ds0.data_sourcekey-generator:column: idtype: SNOWFLAKEtable-strategy:standard:sharding-column: create_timeprecise-algorithm-class-name: com.chunmi.data.group.shardingjdbc.PreciseAlgorithmCustomer# 打开ShardingSphere-sql输出日志---调试时方便查看具体哪张表props:sql:show: true

分片算法:

@Component
public class PreciseAlgorithmCustomer implements PreciseShardingAlgorithm<Date> {private static ShardingAlgorithmReload shardingAlgorithmReload;@Autowiredpublic void setShardingAlgorithmReload(ShardingAlgorithmReload shardingAlgorithmReload) {PreciseAlgorithmCustomer.shardingAlgorithmReload = shardingAlgorithmReload;}@Overridepublic String doSharding(Collection<String> collection, PreciseShardingValue<Date> preciseShardingValue) {String suffix = ShardingDateUtil.getYearMonthDay(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(preciseShardingValue.getValue()));String preciseTable = preciseShardingValue.getLogicTableName() + "_" + suffix;if (collection.contains(preciseTable)) {return preciseTable;} else {String table = shardingAlgorithmReload.tryCreateShardingTable(preciseShardingValue.getLogicTableName(), suffix);if (StringUtils.isNotBlank(table)) {return table;} else {throw new IllegalArgumentException("未找到匹配的数据表");}}}
}

新建表以及重载:

@Slf4j
@Component
public class ShardingAlgorithmReload {@Resourceprivate ShardingDataSource shardingDataSource;private ShardingRuntimeContext runtimeContext;/*** 重载表缓存*/public void tableNameCacheReloadAll() {ShardingRuntimeContext runtimeContext = getRuntimeContext();List<TableRule> tableRuleList = (List<TableRule>) runtimeContext.getRule().getTableRules();for (TableRule tableRule : tableRuleList) {String nodeName = tableRule.getActualDatasourceNames().stream().findFirst().get();Set<String> tablesInDBSet = queryTables(tableRule.getLogicTable());refreshTableRule(tableRule, nodeName, tablesInDBSet);}}protected void refreshTableRule(TableRule tableRule, String nodeName, Set<String> tablesInDBSet) {// sharding缓存的表名Set<String> tableSets = getActualTables(tableRule);// 刷新if (!tableContrast(tableSets, tablesInDBSet)) {List<String> tableList = new ArrayList<>(tablesInDBSet);setDatasourceToTablesMap(tableRule, nodeName, tableList);}}private boolean tableContrast(Set<String> actualTableSets, Set<String> tablesInDBSet) {if (actualTableSets == null || tablesInDBSet == null) {return false;}if (actualTableSets.size() != tablesInDBSet.size()) {return false;}return actualTableSets.containsAll(tablesInDBSet);}protected void refreshShardingAlgorithm(TableRule tableRule, String nodeName) {// 获取分库分表时真正使用的表名Map<String, Set<String>> datasourceToTablesMap = getDatasourceToTablesMap(tableRule);Set<String> tables = datasourceToTablesMap.get(nodeName);ShardingStrategy shardingStrategy = tableRule.getTableShardingStrategy();if (shardingStrategy instanceof ComplexShardingStrategy) {ShardingAlgorithm algorithm = getObjectField(shardingStrategy, "shardingAlgorithm");setValueToBaseAlgorithm(tableRule, algorithm, nodeName, tables);} else if (shardingStrategy instanceof HintShardingStrategy) {ShardingAlgorithm algorithm = getObjectField(shardingStrategy, "shardingAlgorithm");setValueToBaseAlgorithm(tableRule, algorithm, nodeName, tables);} else if (shardingStrategy instanceof StandardShardingStrategy) {ShardingAlgorithm preciseAlgorithm = getObjectField(shardingStrategy, "preciseShardingAlgorithm");setValueToBaseAlgorithm(tableRule, preciseAlgorithm, nodeName, tables);ShardingAlgorithm rangeAlgorithm = getObjectField(shardingStrategy, "rangeShardingAlgorithm");setValueToBaseAlgorithm(tableRule, rangeAlgorithm, nodeName, tables);}}private void setValueToBaseAlgorithm(TableRule tableRule, ShardingAlgorithm algorithm, String nodeName, Set<String> tables) {if (algorithm != null && algorithm instanceof BaseShardingAlgorithm) {BaseShardingAlgorithm baseShardingAlgorithm = (BaseShardingAlgorithm) algorithm;baseShardingAlgorithm.setLogicTable(tableRule.getLogicTable());baseShardingAlgorithm.setTables(tables);baseShardingAlgorithm.setTableRule(tableRule);baseShardingAlgorithm.setNodeName(nodeName);}}protected ShardingRuntimeContext getRuntimeContext() {try {if (runtimeContext == null) {Method getRuntimeContextMethod = shardingDataSource.getClass().getDeclaredMethod("getRuntimeContext");getRuntimeContextMethod.setAccessible(true);runtimeContext = (ShardingRuntimeContext) getRuntimeContextMethod.invoke(shardingDataSource, null);}} catch (Exception e) {log.error("发生异常:" + e);}return runtimeContext;}protected Set<String> getActualTables(TableRule tableRule) {Set<String> tables = getObjectField(tableRule, "actualTables");return tables == null ? new LinkedHashSet<>() : tables;}protected void setDatasourceToTablesMap(TableRule tableRule, String nodeName, List<String> newTableList) {synchronized (tableRule) {Map<String, Set<String>> datasourceToTablesMap = getDatasourceToTablesMap(tableRule);Set<String> tables = datasourceToTablesMap.get(nodeName);Collections.sort(newTableList);tables.clear();tables.addAll(newTableList);}}protected Map<String, Set<String>> getDatasourceToTablesMap(TableRule tableRule) {Map<String, Set<String>> tablesMap = getObjectField(tableRule, "datasourceToTablesMap");return tablesMap == null ? new HashMap<>(0) : tablesMap;}protected static <T> T getObjectField(Object object, String fieldName) {try {Field field = object.getClass().getDeclaredField(fieldName);field.setAccessible(true);return (T) field.get(object);} catch (Exception e) {log.error("发生异常:{}", e);}return null;}protected Set<String> queryTables(String tableName) {Connection conn = null;Statement statement = null;ResultSet rs = null;Set<String> tables = null;try {conn = shardingDataSource.getConnection();statement = conn.createStatement();rs = statement.executeQuery("select table_name from information_schema.tables where table_schema ='ali_sourcedata' and table_name like '" + tableName + "%'");tables = new LinkedHashSet<>();while (rs.next()) {tables.add(rs.getString(1));}} catch (SQLException e) {log.error("获取数据库连接失败!", e);} finally {try {if (rs != null) {rs.close();}if (statement != null) {statement.close();}if (conn != null) {conn.close();}} catch (SQLException e) {log.error("关闭数据连接失败", e);}}return tables;}protected void createTable(String tableName, String suffix) {String tableAllName = tableName + "_" + suffix;String sql = null;if (Constant.FIELD_TABLE_DATA.equals(tableName)) {sql = "CREATE TABLE `" + tableAllName +"` (`id` bigint NOT NULL AUTO_INCREMENT,`deviceType` varchar(500) NOT NULL,`identifier` varchar(255) DEFAULT NULL,`method` varchar(255) DEFAULT NULL,`productKey` varchar(50) DEFAULT NULL,`deviceName` varchar(50) DEFAULT NULL," +"`time` bigint DEFAULT NULL,`value` varchar(500) DEFAULT NULL,`create_time` timestamp NULL DEFAULT NULL COMMENT '创建时间',PRIMARY KEY (`id`),KEY `idx_time_did_model` (`time`,`deviceName`,`productKey`)," +"KEY `idx_did` (`deviceName`),KEY `idx_model` (`productKey`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;";} else if (Constant.FIELD_TABLE_DATA_SOURCE.equals(tableName)) {sql = "CREATE TABLE `" + tableAllName +"` (`id` bigint NOT NULL AUTO_INCREMENT,`productKey` varchar(50) DEFAULT NULL COMMENT '产品model',`deviceName` varchar(50) DEFAULT NULL COMMENT '产品did',`source_data_json` json DEFAULT NULL COMMENT '源数据'," +"`create_time` timestamp NULL DEFAULT NULL COMMENT '创建时间',PRIMARY KEY (`id`), KEY `idx_deviceName` (`deviceName`),KEY `idx_productKey` (`productKey`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;";}Connection conn = null;Statement statement = null;try {conn = shardingDataSource.getConnection();statement = conn.createStatement();statement.executeUpdate(sql);} catch (SQLException e) {log.error("获取数据库连接失败!", e);} finally {try {if (statement != null) {statement.close();}if (conn != null) {conn.close();}} catch (SQLException e) {log.error("关闭数据连接失败", e);}}}public String tryCreateShardingTable(String tableName, String suffix) {String resTable = tableName + "_" + suffix;//建表
        createTable(tableName, suffix);//重载
        tableNameCacheReloadAll();return resTable;}}

工具类:

public class ShardingDateUtil {public static final String DATE_FORMAT_DEFAULT = "yyyy-MM-dd HH:mm:ss";public static final String DATE_FORMAT_NUMBER = "yyyyMMddHHmmss";public static final String YEAR_MONTH_DAY_NUMBER = "yyyyMMdd";public static final String YEAR_MONTH_NUMBER = "yyyyMM";public static final String DATE_FORMAT_DAY_PATTERN = "yyyy-MM-dd";public static final String YEAR_MONTH_DAY_EN_SECOND = "yyyy/MM/dd HH:mm:ss";public static final String YEAR_MONTH_DAY_CN_SECOND = "yyyy年MM月dd日 HH时mm分ss秒";public static final String YEAR_MONTH_DAY_CN = "yyyy年MM月dd日";public static final String MONTH_DAY = "MM-dd";public static String getYearMonth(Long date) {if (date == null) {return null;}return new SimpleDateFormat(YEAR_MONTH_NUMBER).format(date);}public static String getYearMonthDay(String date) {if (date == null) {return null;}String format = DATE_FORMAT_DEFAULT;Date parse = new Date();try {parse = new SimpleDateFormat(format).parse(date);} catch (ParseException e) {e.printStackTrace();}return new SimpleDateFormat(YEAR_MONTH_DAY_NUMBER).format(parse);}public static String getYearMonth(String date, String format) {if (date == null) {return null;}if (StringUtils.isBlank(format)) {format = DATE_FORMAT_DEFAULT;}SimpleDateFormat simpleDateFormat = new SimpleDateFormat(format);return simpleDateFormat.format(date);}
}

初始化表:

import org.springframework.boot.CommandLineRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;import javax.annotation.Resource;@Order(value = 1)
@Component
public class ShardingTablesLoadRunner implements CommandLineRunner {@Resourceprivate ShardingAlgorithmReload shardingAlgorithmReload;@Overridepublic void run(String... args) throws Exception {shardingAlgorithmReload.tableNameCacheReloadAll();}
}

添加多线程处理:

@EnableAsync
@Configuration
public class TheadPoolConfig {@Bean("CommonThreadPoolExecutor")public Executor syncExecutor() {// 获取可用处理器的Java虚拟机的数量int sum = Runtime.getRuntime().availableProcessors();System.out.println("系统最大线程数 -> " + sum);// 实例化自定义线程池ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();// 设置线程池中的核心线程数(最小线程数)executor.setCorePoolSize(5);// 设置线程池中的最大线程数executor.setMaxPoolSize(10);// 设置线程池中任务队列的容量executor.setQueueCapacity(25);// 设置线程池中空闲线程的存活时间executor.setKeepAliveSeconds(60);// 设置线程池中线程的名称前缀executor.setThreadNamePrefix("async-");// 设置线程池关闭时等待所有任务完成的时间。executor.setAwaitTerminationSeconds(60);// 设置线程池中任务队列已满时的拒绝策略,当线程池中的任务队列已满,而且线程池中的线程已经达到了最大线程数时,新的任务就无法被执行。这时就需要设置拒绝策略来处理这种情况。executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());// 设置线程池在关闭时是否等待所有任务完成executor.setWaitForTasksToCompleteOnShutdown(true);// 初始化线程池的配置
        executor.initialize();return executor;}
}

接口处理mq消息:

  @Resourceprivate AliDataService aliDataService;@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "${queue.data-group}",durable = "true", exclusive = "false",autoDelete = "false",arguments = {@Argument(name = "x-message-ttl", value = "3600000", type = "java.lang.Long")}),exchange = @Exchange(name = "${com.chunmi.mq.feiyan.exchange}", type = "topic")))@RabbitHandler@Async(value = "CommonThreadPoolExecutor")public void consumer(String jsonStr) {log.info("物联网生活平台设备上报的消息:{}", jsonStr);JSONObject jsonObject = JSONObject.parseObject(jsonStr);// 处理全部设备事件this.processDataSource(jsonObject);// 处理设备事件 device_eventthis.processData(jsonObject);}

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/800003.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

vscode 搜索框3个按钮分别代表什么

https://blog.csdn.net/u012292754/article/details/108307288相信坚持的力量,日复一日的习惯.

数据库系统 1 关系数据库

数据库系统 1 关系数据库 三层体系结构外部层:数据库的用户视图 概念层:数据库的整体视图,提供内、外部层的映射和必要的独立性所有实体,实体的属性和实体间的联系 数据的约束 数据的语义信息 安全性和完整性信息内部层:数据库在计算机上的物理表示数据独立性 三层体系的主…

记录一次首页优化的经历

公司最近要进行多品牌合一,原来五个品牌的app要合并为一个。品牌立项、审批、方案确定,历史数据迁移、前期的基础工程搭建,兼容以及涉及三方的交互以及改造,需求梳理等也都基本完成,原来计划9月中旬进行上线,但是上线后服务端的压测一直通不过-首页抗不过太高的并发。app…

软工作业3:结对项目——实现一个自动生成小学四则运算题目的命令行程序

这个作业属于哪个课程 https://edu.cnblogs.com/campus/gdgy/CSGrade22-34/这个作业要求在哪里 结对项目 - 作业 - 计科22级34班 - 班级博客 - 博客园 (cnblogs.com)这个作业的目标 结对项目——实现一个自动生成小学四则运算题目的命令行程序成员1 陈奕奕 3222004552成员2 林闰…

基于LangChain手工测试用例转App自动化测试生成工具

在传统编写 App 自动化测试用例的过程中,基本都是需要测试工程师,根据功能测试用例转换为自动化测试的用例。市面上自动生成 Web 或 App 自动化测试用例的产品无非也都是通过录制的方式,获取操作人的行为操作,从而记录测试用例。整个过程类似于但是通常录制出来的用例可用性…

单细胞数据 存储方式汇总

(单细胞下游分析——不同类型的数据读入,与部分数据类型的转化) .h5ad(anndata 数据格式) 10x_mtx(cell ranger输出,三个文件) 就是cell ranger上游比对分析产生的3个文件: ├── xxx_feature_bc_matrix │ ├── barcodes.tsv.gz:细胞标签(barcode) │ ├──…

springcloud组件openfeign超时时间设置

openfeign超时时间设置有两种方式 1、通过配置类;2、通过配置文件 1、使用配置类代码如下:@Configuration public class FeignConfig {@Beanpublic Request.Options options(){//第一个参数是连接超时时间,第二个参数是处理超时时间return new Request.Options(5000,3000);}…

python虚拟环境venv

创建目录 mkdir pyenv 进入 cd pyenv 初始化环境 python3 -m venv .进入bin目录 jihan@jihandeMacBook-Pro pyenv % cd bin jihan@jihandeMacBook-Pro bin % ls Activate.ps1 activate activate.csh activate.fish pip pip3 pip3.12 python python3 python3.12 jihan@jiha…

进行网站监控有必要吗?

在当今数字化高速发展的时代,网站已经成为了企业、组织乃至个人展示自身形象、提供服务、进行交流互动的重要平台。那么,进行网站监控有必要吗?答案无疑是肯定的。 进行网站监控,首先是保障用户体验的关键。对于访问网站的用户来说,他们期望能够快速、稳定地获取所需信息或…

Typora+picgo+jsDelivr实现免费图床

Typora+picgo+github+jsDelivr实现免费图床 需求 typora中写markdown图片是保存在本地的,为了简化写博客时繁琐的插入图片步骤,直接使用typora+picgo将图片上传到云端,这样发博客就只要复制markdown即可 步骤 前期准备:github中创建一个仓库用于保存图片 名字随便填就行1.下…

线上间歇性卡顿问题

事情起因 最近一段时间我们公司有个项目是做视力筛查的,平时都是正常的,但是最近这两天突然会时不时地卡顿一下,一卡就是几分钟。排查过程 1.查看日志 卡顿首先是排查日志,日志报的是feign调用学生服务超时,进到学生服务查看时,看到日志报的是事务超时2.继续排查,既然是…

关于springcloud中openfeign中服务调用日志输出

在使用openfeign进行服务调用的时候,我们可以通过一些配置,获取到服务调用的日志输出,可以从消费端看到日志 有两种方法:一、使用配置类;二、使用配置文件配置 日志输出级别有四种: NONE:不输出(默认) BASIC:只输出请求方式、url、请求成功与否 HEADERS:输出请求头的…