事务中无法切换数据源?DataSourceSwitchInvoker:轻松实现多数据源切换执行工具类

news/2025/2/25 23:29:49/文章来源:https://www.cnblogs.com/zuowj/p/18737399

背景:

在有标注为@Transactional的类或公共方法中(传播特性,如:NOT_SUPPORTED、SUPPORTS、REQUIRED【默认值】、REQUIRES_NEW)执行数据源切换可能不成功(比如:主从数据源切换,多数据源切换等,均会发现切换不成功,或“偶尔又切换成功”),导致本应该需要查主库却查了从库,本应该查B库却仍查了A库导致表不存在等各种查询问题。

原因是什么呢?

本质原因是:因为只要添加了@Transactional (传播特性,如:NOT_SUPPORTED、SUPPORTS、REQUIRED【默认值】、REQUIRES_NEW),在事务同步上下文类型为:SYNCHRONIZATION_ALWAYS时 ,那么会在事务切面中进行初始化事务同步上下文状态【prepareTransactionStatus】(具体可分析代码位置:org.springframework.transaction.support.AbstractPlatformTransactionManager#getTransaction),此时org.springframework.transaction.support.TransactionSynchronizationManager#isSynchronizationActive 是true,若需要事务时(EQUIRED【默认值】、REQUIRES_NEW)则还会org.springframework.transaction.support.AbstractPlatformTransactionManager#doBegin获取connection并开启事务且构建ConnectionHolder注册保存于事务同步上下文中,当mybatis 的SqlSessionTemplate.SqlSessionInterceptor.invoke执行时,第一次会将获取的SqlSession通过SqlSessionUtils.registerSessionHolder注册保存于事务同步上下文中,后续只要是同一个SqlSession,那么间接的就是持有同一个SpringManagedTransaction,SpringManagedTransaction是优先从ConnectionHolder获取已有connection对象,若不存在才会创建新的connection对象,并构建ConnectionHolder注册保存于事务同步上下文中,后续只要是在同一个事务同步上下文中,那么都是复用相同的SqlSession、SpringManagedTransaction、ConnectionHolder,所以单纯的改DataSource(ThreadLocal的线程变量)没有用,因为此时ConnectionHolder中保存的是Connection,而不是DataSource

Spring声明式事务源代码分析流程图

为何偶尔切换数据源成功?

当为事务传播特性为NOT_SUPPORTED、SUPPORTS时,由于此时事务管理器并不会提前打开Conneciton并开启事务(即:也不会保存到ConnectionHolder)【从上图中就可以看出】,而是在执行一条SQL语句时,触发了MyBatis的第一次获取SqlSession,间接的执行了DataSourceUtils.doGetConnection(会保存到ConnectionHolder中),如果在方法中的执行第一条SQL语句前进行数据源切换,那么就可以生效,若在执行第一条SQL语句后再尝试切换,那么由于SqlSession已不是最新的(ConnectionHolder中已有Connection),则只会复用。

解决方案:

新增数据源切换执行器工具类:DataSourceSwitchInvoker,作用:在执行前会检查要切换的数据源与当前已持有的数据源(ConnectionHolder.Connection)是否一致,一致则直接执行回调方法(即:不存在切换数据源),不一致则挂起当前事务(挂事务与资源后,会清空事务同步上下文,就像从来没有执行过事务方法一样,默认状态),然后执行回调方法,最后恢复被挂起的事务与资源,并恢复回执行前的数据源设置。即:相当于在事务执行过程中,撕开一个口子(无任何状态),执行完成后,再恢复回事务的原状态,不影响后续的执行。

DataSourceSwitchInvoker.invokeOn 代码逻辑流程图:

(注:图片部份位置有屏蔽删减是因为我实现了多个版本,本次是简化实用版,无需复杂的设置,直接方法入参传入即可)

DataSourceSwitchInvoker 实现CODE:


/*** @author: zuowenjun* @description:数据源切换后执行器,解决在多数据源项目中,无法在事务方法中进行数据源切换问题*/
@Component
public class DataSourceSwitchInvoker {private static final Logger LOGGER = LoggerFactory.getLogger(DataSourceSwitchInvoker.class);private static final Map<String, String> DATA_SOURCE_NAME_WITH_URL_MAP = new HashMap<>();private static final String SET_BEFORE = "BEFORE";private static final String SET_AFTER = "AFTER";@Value("${dataSourceSwitchInvoker.settings.datasourceJdbcUrlPattern:}")private String datasourceJdbcUrlPattern;/*** 初始化必要条件:数据源配置集合(数据源名称与jdbcUrl对应关系)*/@PostConstructpublic void initializeRequirement() {if (StringUtils.isBlank(datasourceJdbcUrlPattern)) {LOGGER.warn("datasourceJdbcUrlPattern is null");return;}DATA_SOURCE_NAME_WITH_URL_MAP.clear();Map<String, String> configMap = getPropertiesByPattern(datasourceJdbcUrlPattern, value -> ObjectUtils.defaultIfNull(value, "").toString().trim(), (k, v) -> StringUtils.isNotEmpty(v));if (MapUtils.isEmpty(configMap)) {LOGGER.error("DataSourceSwitchInvoker.initializeRequirement configMap is empty ,datasourceJdbcUrlPattern: {}", datasourceJdbcUrlPattern);return;}DATA_SOURCE_NAME_WITH_URL_MAP.putAll(configMap);LOGGER.info("DataSourceSwitchInvoker.initializeRequirement ok");}/*** 在指定的数据源下执行回调方法** @param getCurrentDsNameFunc* @param setCurrentDsNameFunc* @param invokeCallback* @return*/public static <T> T invokeOn(String newDataSourceName, Supplier<String> getCurrentDsNameFunc, Consumer<String> setCurrentDsNameFunc, BiFunction<String, String, Boolean> checkSameDsNameFunc, Supplier<T> invokeCallback) {Assert.notNull(getCurrentDsNameFunc, "执行前获取数据源配置回调方法不能为空");Assert.notNull(setCurrentDsNameFunc, "执行前要设置的数据源配置回调方法不能为空");Assert.notNull(invokeCallback, "具体执行回调方法不能为空");String invokeId = "DSI" + System.currentTimeMillis();String oldDataSourceName = getCurrentDsNameFunc.get();setCurrentDsNameFunc.accept(newDataSourceName);LOGGER.info("DataSourceSwitchInvoker.invokeOn setCurrentDsName {} --> {} ,invokeId: {}", oldDataSourceName, newDataSourceName, invokeId);Object currentTransaction = null;Object suspendedResourcesHolder = null;PlatformTransactionManagerDelegateInner platformTransactionManagerDelegate = null;try {String currentDbConnectionUrl = TransactionManagerUtils.getCurrentDbConnectionUrl(null);if (StringUtils.isEmpty(currentDbConnectionUrl) || currentDbConnectionUrl.equalsIgnoreCase(DATA_SOURCE_NAME_WITH_URL_MAP.get(newDataSourceName))) {//若当前没有持有DB连接 或持有的DB连接与当前要设置的DB数据源相同,则表明无需额外处理,只需正常执行即可return invokeCallback.get();} else if (StringUtils.isNotEmpty(currentDbConnectionUrl) && checkSameDsNameFunc != null) {String currentUsedDataSourceName = DATA_SOURCE_NAME_WITH_URL_MAP.entrySet().stream().filter(kv -> currentDbConnectionUrl.equalsIgnoreCase(kv.getValue())).map(Map.Entry::getKey).findFirst().orElse(null);if (Boolean.TRUE.equals(checkSameDsNameFunc.apply(currentUsedDataSourceName, newDataSourceName))) {//若当前事务连接对应的已实际使用的数据源与要设置的数据源一致,则表明无需额外处理,只需正常执行即可return invokeCallback.get();}}//若持有DB连接,则需要先挂起当前事务或资源AbstractPlatformTransactionManager platformTransactionManager = SpringUtils.getBean(AbstractPlatformTransactionManager.class);Assert.notNull(platformTransactionManager, "not found AbstractPlatformTransactionManager bean");platformTransactionManagerDelegate = new PlatformTransactionManagerDelegateInner(platformTransactionManager);currentTransaction = TransactionManagerUtils.getCurrentTransaction(platformTransactionManager);if (!platformTransactionManagerDelegate.isExistingTransaction(currentTransaction)) {currentTransaction = null;}suspendedResourcesHolder = platformTransactionManagerDelegate.suspend(currentTransaction);LOGGER.debug("DataSourceSwitchInvoker.invokeOn suspend result is {} ,invokeId: {}", suspendedResourcesHolder != null, invokeId);return invokeCallback.get();} finally {String resumeSuspendedResources = null;//前面若有挂起事务或资源,则需在执行完方法后需恢复到当前事务状态if (currentTransaction != null || suspendedResourcesHolder != null) {platformTransactionManagerDelegate.resume(currentTransaction, suspendedResourcesHolder);resumeSuspendedResources = "resume suspendedResources ok";}setCurrentDsNameFunc.accept(oldDataSourceName);LOGGER.info("DataSourceSwitchInvoker.invokeOn end {} , recover setCurrentDsName {} --> {} ,invokeId: {}", resumeSuspendedResources, newDataSourceName, oldDataSourceName, invokeId);}}/*** 在指定的数据源下执行回调方法** @param setCurrentDsNameFunc* @param invokeCallback* @param <T>* @return*/public static <T> T invokeOn(Consumer<String> setCurrentDsNameFunc, Supplier<T> invokeCallback) {return invokeOn(SET_BEFORE, () -> SET_AFTER, setCurrentDsNameFunc, null, invokeCallback);}private static <T> Map<String, T> getPropertiesByPattern(String configPath, Function<Object, T> convertValueFunc, BiFunction<String, T, Boolean> filterFunc) {Assert.notNull(configPath, "param configPath not be null");Assert.notNull(convertValueFunc, "param convertValueFunc not be null");Map<String, T> resultMap = new HashMap<>();if (!(SpringUtils.getApplicationContext().getEnvironment() instanceof ConfigurableEnvironment)) {return resultMap;}ConfigurableEnvironment environment = (ConfigurableEnvironment) SpringUtils.getApplicationContext().getEnvironment();AntPathMatcher antPathMatcher = new AntPathMatcher(".");String configKey = "{configKey}";// 遍历所有的属性源for (PropertySource<?> propertySource : environment.getPropertySources()) {if (propertySource instanceof EnumerablePropertySource) {EnumerablePropertySource<?> enumerablePropertySource = (EnumerablePropertySource<?>) propertySource;// 遍历当前属性源中的所有属性for (String propertyName : enumerablePropertySource.getPropertyNames()) {if (antPathMatcher.match(configPath, propertyName)) {String key = propertyName;if (configPath.contains(configKey)) {key = antPathMatcher.extractUriTemplateVariables(configPath, propertyName).getOrDefault(configKey.replaceAll("[{}]", ""), "<null>");}T value = convertValueFunc.apply(enumerablePropertySource.getProperty(propertyName));if (filterFunc == null || filterFunc.apply(key, value)) {resultMap.put(key, convertValueFunc.apply(value));}}}}}return resultMap;}/*** 通过内部类在不破坏封装性、访问性的前提下,提供当前类内部的protected方法的访问能力*/private static class PlatformTransactionManagerDelegateInner extends PlatformTransactionManagerDelegate {public PlatformTransactionManagerDelegateInner(AbstractPlatformTransactionManager transactionManager) {super(transactionManager);}@Overrideprotected Object suspend(Object transaction) throws TransactionException {return super.suspend(transaction);}@Overrideprotected void resume(Object transaction, Object resourcesHolderObj) {super.resume(transaction, resourcesHolderObj);}@Overrideprotected boolean isExistingTransaction(Object transaction) {return super.isExistingTransaction(transaction);}}}

依赖CODE(注意包名路径需与AbstractPlatformTransactionManager、DataSourceTransactionManager一致):

//author: zuowenjun
//注意包名必需是如下,因为要访问protected方法
package org.springframework.jdbc.datasource;public class PlatformTransactionManagerDelegate {private final AbstractPlatformTransactionManager delegate;public PlatformTransactionManagerDelegate(AbstractPlatformTransactionManager transactionManager) {this.delegate = transactionManager;}protected Object suspend(Object transaction) throws TransactionException {return delegate.suspend(transaction);}protected void resume(Object transaction, Object resourcesHolderObj) {AbstractPlatformTransactionManager.SuspendedResourcesHolder resourcesHolder = (AbstractPlatformTransactionManager.SuspendedResourcesHolder) resourcesHolderObj;delegate.resume(transaction, resourcesHolder);}protected boolean isExistingTransaction(Object transaction) {return delegate.isExistingTransaction(transaction);}}//author: zuowenjun
//注意包名必需是如下,因为要访问protected方法
package org.springframework.transaction.support;public class TransactionManagerUtils {public static String getCurrentDbConnectionUrl(String threadLocalDbNameIfNoSet) {DataSource dataSource = SpringUtils.getBean(DataSource.class);if (dataSource == null) {return threadLocalDbNameIfNoSet;}ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(dataSource);if (conHolder == null || !conHolder.hasConnection()) {return threadLocalDbNameIfNoSet;}try {return conHolder.getConnection().getMetaData().getURL();} catch (Throwable e) {LOGGER.warn("TransactionManagerUtils.getCurrentDbConnectionUrl error", e);}return threadLocalDbNameIfNoSet;}public static Object getCurrentTransaction(AbstractPlatformTransactionManager transactionManager) {if (!(transactionManager instanceof DataSourceTransactionManager)) {throw new RuntimeException("only support DataSourceTransactionManager doGetTransaction");}DataSourceTransactionManager dsTransactionManager = (DataSourceTransactionManager) transactionManager;return dsTransactionManager.doGetTransaction();}}

其中:SpringUtils工具类是一个简单的实现了Spring上下文织入的接口然后赋值给静态字段,最终实现可以直接使用applicationContext.getBean(type)

使用示例CODE:

//假设这里是数据源的设置,tips:多数据源一般都是自定义实现了AbstractRoutingDataSource,然后使用ThreadLocal来保存设置当前要使用的数据源配置名称private ThreadLocal<String> dataSourceHolder = new ThreadLocal<>();@Transactional
public doWithTx(){//第一种方法:【推荐第一种】//假设之前是read_db 数据源,现在需要切换成master_dbDataSourceSwitchInvoker.invokeOn("master_db", () -> dataSourceHolder.get(), (dsName) -> dataSourceHolder.set(dsName), null, () -> {Object demo = null; //模拟 demoMapper.get(123L);return demo;});//第二种方法:(重载方法,一个设置数据源方法处理执行前、执行后的数据源设置)//假设之前是read_db 数据源,现在需要切换成master_dbAtomicReference<String> dsName = new AtomicReference<>();DataSourceSwitchInvoker.invokeOn(eventName -> {if (SET_BEFORE.equals(eventName)) {//执行前,自行记录之前的数据源dsName.set(dataSourceHolder.get());//设置新数据源dataSourceHolder.set("master_db");} else if (SET_AFTER.equals(eventName)) {//执行后,还原设置数据源dataSourceHolder.set(dsName.get());}}, () -> {Object demo = null; //模拟 demoMapper.get(123L);return demo;});
}

编码建议:

切换虽好用,但建议不要在切换的方法中进行写数据的操作,更适合仅用于临时需要查询其他数据源的数据时使用,以免破坏spring事务的完整性,因为invokeOn方法本身就是先挂起一个事务,然后开新连接执行新的操作DB的方法,最后还原恢复事务,若在其中又进行了其他的操作,可能存在未知风险,虽然理论做什么都可以但非常不建议。

经多种测试,无论是普通方法 OR 在事务中的方法,均能正常执行,简直就是YYDS!原创不易,如有帮助关注+点个赞吧v

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/889829.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java基础05(常用类)

匿名内部类 Object类 包装类 String类 BigDecimal类 Date类(特定时间)Calendar类(日历)SimpleDateFormat类(格式化时间)System类(系统类)Java基础05(常用类) 内部类 成员内部类在类的内部定义,与实例变量、实例方法同级别的类外部类的一个实例部分,创建内部类对象…

mysql表字段varchar(10)和varchar(255)测试文件占用

前言全局说明一、说明 1.1 环境: Windows 11 家庭版 23H2 22631.3737 MySQL: 服务器版本: 5.6.34 - MySQL Community Server (GPL) Navicat for MySQL: 10.1.71.2 测试样本 两个字段: id字段是 1~10位不等长度的随机数; num字段是 11~25位不等的随机数字;为了更好模拟实际使…

FreeRTOS高效应用实战

FreeRTOS高效应用实战 基于STM32CubeIDE生成对芯片移植好的FreeRTOS工程,使用HAL库编写FreeRTOS应用程序,实现FreeRTOS高效应用实战引入函数句柄的概念函数句柄(Function Handle)是编程中用于间接引用和操作函数的一种机制,其本质是将函数作为数据来传递和存储。以下是关于…

解决ZYNQ-7020开发板使用vitis编译uboot报错和无法正常调试的问题

整个学习过程是参考正点原子启明星开发板的2020.2版本嵌入式Linux开发指南,在学习uboot移植的时候遇到了问题。 新建工程和配置环境啥的和教程里都一样,就不罗嗦了,这里重点讲和教程不一样的地方(或者说教程里有问题的地方)。 新建工程后编译时遇到的报错 在按照教程新建ub…

markDown学习日记

标题 标题是通过#和一个空格来创建,标题的等级是通过#的个数来鉴别。 字体样式 进步进步进步 加粗效果由2个*前后包裹来实现 进步进步进步 斜体需要一个*来实现 (两者都实现需要三个*) 进步进步进步 删除需要两边都用波浪号 ~ 实现 引用明德新民 止于至善用>实…

LVM(Logical Volume Manager)

一. LVM概述 1. 什么是 LVM LVM(Logical Volume Manager,逻辑卷管理器)是 Linux 系统下的一种 存储管理 机制,能够灵活地管理磁盘分区。它提供了一种比传统分区管理(如fdisk、parted)更高级的存储管理方式,允许动态调整存储空间,方便扩展和缩减分区,而不会影响已有数据…

碎片

平板电脑和手机最大的区别就在于屏幕的大小, 一般手机屏幕的大小会在 3 英寸到 6 英寸之间, 而一般平板电脑屏幕的大小会在 7 英寸到 10 英寸之间。屏幕大小差距过大有可能会让同样的界面在视觉效果上有较大的差异, 比如一些界面在手机上看起来非常美观, 但在平板电脑上看起来就…

作业1 随笔

这个作业属于哪个课程 班级的链接这个作业要求在哪里 作业要求的链接这个作业的目标 学习使用markdown、博客园与GitHub,大致了解本学科1、自我介绍兴趣爱好:玩游戏,打羽毛球,喜欢拍照,喜欢理解学习软件工程专业的新东西 学习方面:主要学习Java,2、5个想弄懂的问题从事后…

NetPad:一个.NET开源、跨平台的C#编辑器

前言 今天大姚给大家分享一个基于.NET开源、跨平台的C#编辑器和游乐场:NetPad。 项目介绍 NetPad是一个基于.NET开源(MIT License)、跨平台的C#编辑器和游乐场,它允许用户立即运行C#代码,无需创建和管理项目。项目技术栈.NET:作为底层框架,提供强大的开发能力和跨平台支…

upload-labs/Pass-18条件竞争绕过

根据代码可以看出,Pass-18 先保存了文件然后再判断文件是否合法,不合法就删除文件; 其他文件上传漏洞都是先判断文件是否合法然后再保存文件 因此可以知道,我们上传的不合法的文件是可以传到服务器的,在上传到服务器和文件删除之间会有一个间隙 我们可以利用这个间隙来绕过…

14 Java的Stream流详解

Stream是[Java 8](https://so.csdn.net/so/search?q=Java 8&spm=1001.2101.3001.7020) API添加的一个新的抽象,称为流Stream,以一种声明性方式处理数据集合(侧重对于源数据计算能力的封装,并且支持序列与并行两种操作方式)Stream流是从支持数据处理操作的源生成的元素…