项目中处理接收设备上报日志需求,上报数据量大,因此对数据进行按日期分表处理。
引入所需jar :
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><dependency><groupId>com.alibaba.fastjson2</groupId><artifactId>fastjson2</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-jpa</artifactId></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.23</version></dependency><!--shardingJDBC--><dependency><groupId>org.apache.shardingsphere</groupId><artifactId>sharding-jdbc-spring-boot-starter</artifactId><version>4.1.1</version></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><scope>test</scope></dependency>
在application.yml 中配置数据库分表:
spring:application:name: data-systemprofiles:active: local# 关闭驼峰命名jpa:hibernate:naming:physical-strategy: org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl# sharding jdbc配置shardingsphere:datasource:names: ds0ds0:type: com.zaxxer.hikari.HikariDataSourcedriver-class-name: com.mysql.cj.jdbc.Driverjdbc-url: jdbc:mysql:username: password: # 配置表的分布,表的策略sharding:tables:ali_data:actual-data-nodes: ds0.ali_datakey-generator:# 指定表 主键id 生成策略为 SNOWFLAKEcolumn: idtype: SNOWFLAKEtable-strategy:standard:# 分片字段sharding-column: create_time# 精确算法实现类路径precise-algorithm-class-name: com.chunmi.data.group.shardingjdbc.PreciseAlgorithmCustomerdata_source:actual-data-nodes: ds0.data_sourcekey-generator:column: idtype: SNOWFLAKEtable-strategy:standard:sharding-column: create_timeprecise-algorithm-class-name: com.chunmi.data.group.shardingjdbc.PreciseAlgorithmCustomer# 打开ShardingSphere-sql输出日志---调试时方便查看具体哪张表props:sql:show: true
分片算法:
@Component public class PreciseAlgorithmCustomer implements PreciseShardingAlgorithm<Date> {private static ShardingAlgorithmReload shardingAlgorithmReload;@Autowiredpublic void setShardingAlgorithmReload(ShardingAlgorithmReload shardingAlgorithmReload) {PreciseAlgorithmCustomer.shardingAlgorithmReload = shardingAlgorithmReload;}@Overridepublic String doSharding(Collection<String> collection, PreciseShardingValue<Date> preciseShardingValue) {String suffix = ShardingDateUtil.getYearMonthDay(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(preciseShardingValue.getValue()));String preciseTable = preciseShardingValue.getLogicTableName() + "_" + suffix;if (collection.contains(preciseTable)) {return preciseTable;} else {String table = shardingAlgorithmReload.tryCreateShardingTable(preciseShardingValue.getLogicTableName(), suffix);if (StringUtils.isNotBlank(table)) {return table;} else {throw new IllegalArgumentException("未找到匹配的数据表");}}} }
新建表以及重载:
@Slf4j @Component public class ShardingAlgorithmReload {@Resourceprivate ShardingDataSource shardingDataSource;private ShardingRuntimeContext runtimeContext;/*** 重载表缓存*/public void tableNameCacheReloadAll() {ShardingRuntimeContext runtimeContext = getRuntimeContext();List<TableRule> tableRuleList = (List<TableRule>) runtimeContext.getRule().getTableRules();for (TableRule tableRule : tableRuleList) {String nodeName = tableRule.getActualDatasourceNames().stream().findFirst().get();Set<String> tablesInDBSet = queryTables(tableRule.getLogicTable());refreshTableRule(tableRule, nodeName, tablesInDBSet);}}protected void refreshTableRule(TableRule tableRule, String nodeName, Set<String> tablesInDBSet) {// sharding缓存的表名Set<String> tableSets = getActualTables(tableRule);// 刷新if (!tableContrast(tableSets, tablesInDBSet)) {List<String> tableList = new ArrayList<>(tablesInDBSet);setDatasourceToTablesMap(tableRule, nodeName, tableList);}}private boolean tableContrast(Set<String> actualTableSets, Set<String> tablesInDBSet) {if (actualTableSets == null || tablesInDBSet == null) {return false;}if (actualTableSets.size() != tablesInDBSet.size()) {return false;}return actualTableSets.containsAll(tablesInDBSet);}protected void refreshShardingAlgorithm(TableRule tableRule, String nodeName) {// 获取分库分表时真正使用的表名Map<String, Set<String>> datasourceToTablesMap = getDatasourceToTablesMap(tableRule);Set<String> tables = datasourceToTablesMap.get(nodeName);ShardingStrategy shardingStrategy = tableRule.getTableShardingStrategy();if (shardingStrategy instanceof ComplexShardingStrategy) {ShardingAlgorithm algorithm = getObjectField(shardingStrategy, "shardingAlgorithm");setValueToBaseAlgorithm(tableRule, algorithm, nodeName, tables);} else if (shardingStrategy instanceof HintShardingStrategy) {ShardingAlgorithm algorithm = getObjectField(shardingStrategy, "shardingAlgorithm");setValueToBaseAlgorithm(tableRule, algorithm, nodeName, tables);} else if (shardingStrategy instanceof StandardShardingStrategy) {ShardingAlgorithm preciseAlgorithm = getObjectField(shardingStrategy, "preciseShardingAlgorithm");setValueToBaseAlgorithm(tableRule, preciseAlgorithm, nodeName, tables);ShardingAlgorithm rangeAlgorithm = getObjectField(shardingStrategy, "rangeShardingAlgorithm");setValueToBaseAlgorithm(tableRule, rangeAlgorithm, nodeName, tables);}}private void setValueToBaseAlgorithm(TableRule tableRule, ShardingAlgorithm algorithm, String nodeName, Set<String> tables) {if (algorithm != null && algorithm instanceof BaseShardingAlgorithm) {BaseShardingAlgorithm baseShardingAlgorithm = (BaseShardingAlgorithm) algorithm;baseShardingAlgorithm.setLogicTable(tableRule.getLogicTable());baseShardingAlgorithm.setTables(tables);baseShardingAlgorithm.setTableRule(tableRule);baseShardingAlgorithm.setNodeName(nodeName);}}protected ShardingRuntimeContext getRuntimeContext() {try {if (runtimeContext == null) {Method getRuntimeContextMethod = shardingDataSource.getClass().getDeclaredMethod("getRuntimeContext");getRuntimeContextMethod.setAccessible(true);runtimeContext = (ShardingRuntimeContext) getRuntimeContextMethod.invoke(shardingDataSource, null);}} catch (Exception e) {log.error("发生异常:" + e);}return runtimeContext;}protected Set<String> getActualTables(TableRule tableRule) {Set<String> tables = getObjectField(tableRule, "actualTables");return tables == null ? new LinkedHashSet<>() : tables;}protected void setDatasourceToTablesMap(TableRule tableRule, String nodeName, List<String> newTableList) {synchronized (tableRule) {Map<String, Set<String>> datasourceToTablesMap = getDatasourceToTablesMap(tableRule);Set<String> tables = datasourceToTablesMap.get(nodeName);Collections.sort(newTableList);tables.clear();tables.addAll(newTableList);}}protected Map<String, Set<String>> getDatasourceToTablesMap(TableRule tableRule) {Map<String, Set<String>> tablesMap = getObjectField(tableRule, "datasourceToTablesMap");return tablesMap == null ? new HashMap<>(0) : tablesMap;}protected static <T> T getObjectField(Object object, String fieldName) {try {Field field = object.getClass().getDeclaredField(fieldName);field.setAccessible(true);return (T) field.get(object);} catch (Exception e) {log.error("发生异常:{}", e);}return null;}protected Set<String> queryTables(String tableName) {Connection conn = null;Statement statement = null;ResultSet rs = null;Set<String> tables = null;try {conn = shardingDataSource.getConnection();statement = conn.createStatement();rs = statement.executeQuery("select table_name from information_schema.tables where table_schema ='ali_sourcedata' and table_name like '" + tableName + "%'");tables = new LinkedHashSet<>();while (rs.next()) {tables.add(rs.getString(1));}} catch (SQLException e) {log.error("获取数据库连接失败!", e);} finally {try {if (rs != null) {rs.close();}if (statement != null) {statement.close();}if (conn != null) {conn.close();}} catch (SQLException e) {log.error("关闭数据连接失败", e);}}return tables;}protected void createTable(String tableName, String suffix) {String tableAllName = tableName + "_" + suffix;String sql = null;if (Constant.FIELD_TABLE_DATA.equals(tableName)) {sql = "CREATE TABLE `" + tableAllName +"` (`id` bigint NOT NULL AUTO_INCREMENT,`deviceType` varchar(500) NOT NULL,`identifier` varchar(255) DEFAULT NULL,`method` varchar(255) DEFAULT NULL,`productKey` varchar(50) DEFAULT NULL,`deviceName` varchar(50) DEFAULT NULL," +"`time` bigint DEFAULT NULL,`value` varchar(500) DEFAULT NULL,`create_time` timestamp NULL DEFAULT NULL COMMENT '创建时间',PRIMARY KEY (`id`),KEY `idx_time_did_model` (`time`,`deviceName`,`productKey`)," +"KEY `idx_did` (`deviceName`),KEY `idx_model` (`productKey`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;";} else if (Constant.FIELD_TABLE_DATA_SOURCE.equals(tableName)) {sql = "CREATE TABLE `" + tableAllName +"` (`id` bigint NOT NULL AUTO_INCREMENT,`productKey` varchar(50) DEFAULT NULL COMMENT '产品model',`deviceName` varchar(50) DEFAULT NULL COMMENT '产品did',`source_data_json` json DEFAULT NULL COMMENT '源数据'," +"`create_time` timestamp NULL DEFAULT NULL COMMENT '创建时间',PRIMARY KEY (`id`), KEY `idx_deviceName` (`deviceName`),KEY `idx_productKey` (`productKey`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;";}Connection conn = null;Statement statement = null;try {conn = shardingDataSource.getConnection();statement = conn.createStatement();statement.executeUpdate(sql);} catch (SQLException e) {log.error("获取数据库连接失败!", e);} finally {try {if (statement != null) {statement.close();}if (conn != null) {conn.close();}} catch (SQLException e) {log.error("关闭数据连接失败", e);}}}public String tryCreateShardingTable(String tableName, String suffix) {String resTable = tableName + "_" + suffix;//建表 createTable(tableName, suffix);//重载 tableNameCacheReloadAll();return resTable;}}
工具类:
public class ShardingDateUtil {public static final String DATE_FORMAT_DEFAULT = "yyyy-MM-dd HH:mm:ss";public static final String DATE_FORMAT_NUMBER = "yyyyMMddHHmmss";public static final String YEAR_MONTH_DAY_NUMBER = "yyyyMMdd";public static final String YEAR_MONTH_NUMBER = "yyyyMM";public static final String DATE_FORMAT_DAY_PATTERN = "yyyy-MM-dd";public static final String YEAR_MONTH_DAY_EN_SECOND = "yyyy/MM/dd HH:mm:ss";public static final String YEAR_MONTH_DAY_CN_SECOND = "yyyy年MM月dd日 HH时mm分ss秒";public static final String YEAR_MONTH_DAY_CN = "yyyy年MM月dd日";public static final String MONTH_DAY = "MM-dd";public static String getYearMonth(Long date) {if (date == null) {return null;}return new SimpleDateFormat(YEAR_MONTH_NUMBER).format(date);}public static String getYearMonthDay(String date) {if (date == null) {return null;}String format = DATE_FORMAT_DEFAULT;Date parse = new Date();try {parse = new SimpleDateFormat(format).parse(date);} catch (ParseException e) {e.printStackTrace();}return new SimpleDateFormat(YEAR_MONTH_DAY_NUMBER).format(parse);}public static String getYearMonth(String date, String format) {if (date == null) {return null;}if (StringUtils.isBlank(format)) {format = DATE_FORMAT_DEFAULT;}SimpleDateFormat simpleDateFormat = new SimpleDateFormat(format);return simpleDateFormat.format(date);} }
初始化表:
import org.springframework.boot.CommandLineRunner; import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component;import javax.annotation.Resource;@Order(value = 1) @Component public class ShardingTablesLoadRunner implements CommandLineRunner {@Resourceprivate ShardingAlgorithmReload shardingAlgorithmReload;@Overridepublic void run(String... args) throws Exception {shardingAlgorithmReload.tableNameCacheReloadAll();} }
添加多线程处理:
@EnableAsync @Configuration public class TheadPoolConfig {@Bean("CommonThreadPoolExecutor")public Executor syncExecutor() {// 获取可用处理器的Java虚拟机的数量int sum = Runtime.getRuntime().availableProcessors();System.out.println("系统最大线程数 -> " + sum);// 实例化自定义线程池ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();// 设置线程池中的核心线程数(最小线程数)executor.setCorePoolSize(5);// 设置线程池中的最大线程数executor.setMaxPoolSize(10);// 设置线程池中任务队列的容量executor.setQueueCapacity(25);// 设置线程池中空闲线程的存活时间executor.setKeepAliveSeconds(60);// 设置线程池中线程的名称前缀executor.setThreadNamePrefix("async-");// 设置线程池关闭时等待所有任务完成的时间。executor.setAwaitTerminationSeconds(60);// 设置线程池中任务队列已满时的拒绝策略,当线程池中的任务队列已满,而且线程池中的线程已经达到了最大线程数时,新的任务就无法被执行。这时就需要设置拒绝策略来处理这种情况。executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());// 设置线程池在关闭时是否等待所有任务完成executor.setWaitForTasksToCompleteOnShutdown(true);// 初始化线程池的配置 executor.initialize();return executor;} }
接口处理mq消息:
@Resourceprivate AliDataService aliDataService;@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "${queue.data-group}",durable = "true", exclusive = "false",autoDelete = "false",arguments = {@Argument(name = "x-message-ttl", value = "3600000", type = "java.lang.Long")}),exchange = @Exchange(name = "${com.chunmi.mq.feiyan.exchange}", type = "topic")))@RabbitHandler@Async(value = "CommonThreadPoolExecutor")public void consumer(String jsonStr) {log.info("物联网生活平台设备上报的消息:{}", jsonStr);JSONObject jsonObject = JSONObject.parseObject(jsonStr);// 处理全部设备事件this.processDataSource(jsonObject);// 处理设备事件 device_eventthis.processData(jsonObject);}