ShardingSphere如何轻松驾驭Seata柔性分布式事务?

news/2025/1/13 7:48:41/文章来源:https://www.cnblogs.com/JavaEdge/p/18549747

0 前文

上一文解析了 ShardingSphere 强一致性事务支持 XAShardingTransactionManager ,本文继续:

  • 讲解该类
  • 介绍支持柔性事务的 SeataATShardingTransactionManager

sharding-transaction-xa-core中关于 XAShardingTransactionManager,本文研究 XATransactionManager 和 ShardingConnection 类实现。

1 XAShardingTransactionManager

1.1 init

public void init(final DatabaseType databaseType, final Collection<ResourceDataSource> resourceDataSources) {for (ResourceDataSource each : resourceDataSources) {// 根据传入的 ResourceDataSource创建XATransactionDataSource并缓存cachedDataSources.put(each.getOriginalName(), new XATransactionDataSource(databaseType, each.getUniqueResourceName(), each.getDataSource(), xaTransactionManager));}// 对通过 SPI 创建的 XATransactionManager 也执行其 init 初始化xaTransactionManager.init();
}

1.2 其它方法

实现也简单:

@Override
public TransactionType getTransactionType() {return TransactionType.XA;
}@SneakyThrows
@Override
public boolean isInTransaction() {return Status.STATUS_NO_TRANSACTION != xaTransactionManager.getTransactionManager().getStatus();
}@Override
public Connection getConnection(final String dataSourceName) throws SQLException {return cachedDataSources.get(dataSourceName).getConnection();
}

1.3 事务操作相关

begin、commit 和 rollback直接委托保存在 XATransactionManager#TransactionManager 完成:

@SneakyThrows
@Override
public void begin() {xaTransactionManager.getTransactionManager().begin();
}@SneakyThrows
@Override
public void commit() {xaTransactionManager.getTransactionManager().commit();
}@SneakyThrows
@Override
public void rollback() {xaTransactionManager.getTransactionManager().rollback();
}

2 AtomikosTransactionManager

TransactionManager默认实现。

2.1 AtomikosXARecoverableResource

代表资源:

public final class AtomikosXARecoverableResource extends JdbcTransactionalResource {private final String resourceName;AtomikosXARecoverableResource(final String serverName, final XADataSource xaDataSource) {super(serverName, xaDataSource);resourceName = serverName;}// 比对SingleXAResource#ResourceName,确定是否在使用资源,此即设计包装 XAResource 的 SingleXAResource 类的原因@Overridepublic boolean usesXAResource(final XAResource xaResource) {return resourceName.equals(((SingleXAResource) xaResource).getResourceName());}
}

2.2 AtomikosXARecoverableResource

public final class AtomikosTransactionManager implements XATransactionManager {private final UserTransactionManager transactionManager = new UserTransactionManager();private final UserTransactionService userTransactionService = new UserTransactionServiceImp();@Overridepublic void init() {userTransactionService.init();}@Overridepublic void registerRecoveryResource(final String dataSourceName, final XADataSource xaDataSource) {userTransactionService.registerResource(new AtomikosXARecoverableResource(dataSourceName, xaDataSource));}@Overridepublic void removeRecoveryResource(final String dataSourceName, final XADataSource xaDataSource) {userTransactionService.removeResource(new AtomikosXARecoverableResource(dataSourceName, xaDataSource));}@Override@SneakyThrowspublic void enlistResource(final SingleXAResource xaResource) {transactionManager.getTransaction().enlistResource(xaResource);}@Overridepublic TransactionManager getTransactionManager() {return transactionManager;}@Overridepublic void close() {userTransactionService.shutdown(true);}
}

对 Atomikos 的 UserTransactionManager、UserTransactionService 简单调用,Atomikos#UserTransactionManager 实现 TransactionManager 接口,封装所有 TransactionManager 需要完成的工作。

看完 sharding-transaction-xa-atomikos-manager,再看 sharding-transaction-xa-bitronix-manager 工程。基于 bitronix 的 XATransactionManager 实现方案

3 BitronixXATransactionManager

public final class BitronixXATransactionManager implements XATransactionManager {private final BitronixTransactionManager bitronixTransactionManager = TransactionManagerServices.getTransactionManager();@Overridepublic void init() {}@SneakyThrows@Overridepublic void registerRecoveryResource(final String dataSourceName, final XADataSource xaDataSource) {ResourceRegistrar.register(new BitronixRecoveryResource(dataSourceName, xaDataSource));}@SneakyThrows@Overridepublic void removeRecoveryResource(final String dataSourceName, final XADataSource xaDataSource) {ResourceRegistrar.unregister(new BitronixRecoveryResource(dataSourceName, xaDataSource));}@SneakyThrows@Overridepublic void enlistResource(final SingleXAResource singleXAResource) {bitronixTransactionManager.getTransaction().enlistResource(singleXAResource);}@Overridepublic TransactionManager getTransactionManager() {return bitronixTransactionManager;}@Overridepublic void close() {bitronixTransactionManager.shutdown();}
}

XA两阶段提交核心类:

4 ShardingConnection

上图的整个流程源头ShardingConnection类,构造函数发现创建 ShardingTransactionManager 过程:

@Getter
public final class ShardingConnection extends AbstractConnectionAdapter {public ShardingConnection(...) {...shardingTransactionManager = runtimeContext.getShardingTransactionManagerEngine().getTransactionManager(transactionType);}
}

ShardingConnection多处用到上面创建的shardingTransactionManager。如:

createConnection

获取连接:

@Override
protected Connection createConnection(final String dataSourceName, final DataSource dataSource) throws SQLException {return isInShardingTransaction() ? shardingTransactionManager.getConnection(dataSourceName) : dataSource.getConnection();
}

isInShardingTransaction

判断是否在同一事务:

private boolean isInShardingTransaction() {return null != shardingTransactionManager && shardingTransactionManager.isInTransaction();
}

setAutoCommit

@Override
public void setAutoCommit(final boolean autoCommit) throws SQLException {if (TransactionType.LOCAL == transactionType) {super.setAutoCommit(autoCommit);return;}if (autoCommit && !shardingTransactionManager.isInTransaction() || !autoCommit && shardingTransactionManager.isInTransaction()) {return;}if (autoCommit && shardingTransactionManager.isInTransaction()) {shardingTransactionManager.commit();return;}if (!autoCommit && !shardingTransactionManager.isInTransaction()) {closeCachedConnections();shardingTransactionManager.begin();}
}

事务类型为本地事务时,直接调用 ShardingConnection 父类 AbstractConnectionAdapter#setAutoCommit 完成本地事务自动提交:

  • autoCommit=true 且运行在事务中,调shardingTransactionManager.commit()完成提交
  • autoCommit=false 且当前不在事务中时,调 shardingTransactionManager.begin() 启动事务

commit、rollback

类似setAutoCommit ,按事务类型决定是否进行分布式提交和回滚:

@Override
public void commit() throws SQLException {if (TransactionType.LOCAL == transactionType) {super.commit();} else {shardingTransactionManager.commit();}
}@Override
public void rollback() throws SQLException {if (TransactionType.LOCAL == transactionType) {super.rollback();} else {shardingTransactionManager.rollback();}
}

ShardingSphere提供两阶段提交的 XA 协议实现方案的同时,也实现柔性事务。看完 XAShardingTransactionManager,来看基于 Seata 框架的柔性事务 TransactionManager 实现类 SeataATShardingTransactionManager。

5 SeataATShardingTransactionManager

该类完全采用阿里Seata框架提供分布式事务特性,而非遵循类似 XA 这样的开发规范,所以代码实现比 XAShardingTransactionManager 类层结构简单,复杂性都屏蔽在了框架内部。

集成 Seata,先要初始化 TMClient、RMClient,在 Seata 内部,这两个客户端之间会基于RPC通信。

SeataATShardingTransactionManager#init的initSeataRPCClient初始化这俩客户端对象:

// 根据 seata.conf 创建配置对象
FileConfiguration configuration = new FileConfiguration("seata.conf");initSeataRPCClient() {String applicationId = configuration.getConfig("client.application.id");Preconditions.checkNotNull(applicationId, "please config application id within seata.conf file");String transactionServiceGroup = configuration.getConfig("client.transaction.service.group", "default");TMClient.init(applicationId, transactionServiceGroup);RMClient.init(applicationId, transactionServiceGroup);
}

Seata也提供一套构建在 JDBC 规范之上的实现策略,类似03文介绍的 ShardingSphere 与 JDBC 规范之间兼容性。

Seata使用DataSourceProxy、ConnectionProxy代理对象,如DataSourceProxy:

实现了自定义Resource接口,继承AbstractDataSourceProxy(最终实现JDBC的DataSource接口)。所以,初始化 Seata 框架时,也要根据输入 DataSource 对象构建 DataSourceProxy,并通过 DataSourceProxy 获取 ConnectionProxy。

init、getConnection

@Override
public void init(final DatabaseType databaseType, final Collection<ResourceDataSource> resourceDataSources) {// 初始化 Seata 客户端initSeataRPCClient();// 创建 DataSourceProxy 并放入Mapfor (ResourceDataSource each : resourceDataSources) {dataSourceMap.put(each.getOriginalName(), new DataSourceProxy(each.getDataSource()));}
}@Override
public Connection getConnection(final String dataSourceName) {// 根据 DataSourceProxy 获取 ConnectionProxyreturn dataSourceMap.get(dataSourceName).getConnection();
}

初始化后,提供了事务开启和提交相关的入口。Seata的GlobalTransaction是核心接口,封装了面向用户操作层的分布式事务访问入口:

public interface GlobalTransaction {void begin() throws TransactionException;void begin(int timeout) throws TransactionException;void begin(int timeout, String name) throws TransactionException;void commit() throws TransactionException;void rollback() throws TransactionException;GlobalStatus getStatus() throws TransactionException;String getXid();
}

ShardingSphere 作 GlobalTransaction 的用户层,也基于 GlobalTransaction 完成分布式事务操作。但 ShardingSphere 并未直接使用这层,而是设计位于sharding-transaction-base-seata-at的SeataTransactionHolder类,保存线程安全的 GlobalTransaction 对象。

SeataTransactionHolder

final class SeataTransactionHolder {private static final ThreadLocal<GlobalTransaction> CONTEXT = new ThreadLocal<>();static void set(final GlobalTransaction transaction) {CONTEXT.set(transaction);} static GlobalTransaction get() {return CONTEXT.get();}static void clear() {CONTEXT.remove();}
}

使用 ThreadLocal 确保对 GlobalTransaction 访问的线程安全性。

咋判断当前操作是否处于一个全局事务?Seata存在一个上下文对象RootContex保存参与者和发起者之间传播的 Xid:

  • 当事务发起者开启全局事务,将 Xid 填入 RootContext
  • 然后 Xid 沿服务调用链一直传播,进而填充到每个事务参与者进程的 RootContext
  • 事务参与者发现 RootContext 存在 Xid,就可知自己处于全局事务

因此,只需判断:

@Override
public boolean isInTransaction() {return null != RootContext.getXID();
}

Seata 也提供针对全局事务的上下文类 GlobalTransactionContext,可用:

  • getCurrent 获取一个 GlobalTransaction对象
  • 或通过 getCurrentOrCreate 在无法获取 GlobalTransaction 对象时新建一个

就不难理解如下实现了

begin

@Override
@SneakyThrows
public void begin() {// 创建一个 GlobalTransaction,保存到 SeataTransactionHolderSeataTransactionHolder.set(GlobalTransactionContext.getCurrentOrCreate());// 从 SeataTransactionHolder 获取一个 GlobalTransaction,并调 begin 启动事务SeataTransactionHolder.get().begin();SeataTransactionBroadcaster.collectGlobalTxId();
}

注意到最后的类:

SeataTransactionBroadcaster

保存 Seata 全局 Xid 的一个容器类。事务启动时收集全局 Xid 并进行保存,而在事务提交或回滚时清空这些 Xid。

class SeataTransactionBroadcaster {String SEATA_TX_XID = "SEATA_TX_XID";static void collectGlobalTxId() {if (RootContext.inGlobalTransaction()) {ShardingExecuteDataMap.getDataMap().put(SEATA_TX_XID, RootContext.getXID());}}static void broadcastIfNecessary(final Map<String, Object> shardingExecuteDataMap) {if (shardingExecuteDataMap.containsKey(SEATA_TX_XID) && !RootContext.inGlobalTransaction()) {RootContext.bind((String) shardingExecuteDataMap.get(SEATA_TX_XID));}}static void clear() {ShardingExecuteDataMap.getDataMap().remove(SEATA_TX_XID);}
}

因此

commit、rollback和close

实现就清楚了:

@Override
public void commit() {try {SeataTransactionHolder.get().commit();} finally {SeataTransactionBroadcaster.clear();SeataTransactionHolder.clear();}
}@Override
public void rollback() {try {SeataTransactionHolder.get().rollback();} finally {SeataTransactionBroadcaster.clear();SeataTransactionHolder.clear();}
}@Override
public void close() {dataSourceMap.clear();SeataTransactionHolder.clear();TmRpcClient.getInstance().destroy();RmRpcClient.getInstance().destroy();
}

sharding-transaction-base-seata-at 工程中的代码实际上就只有这些内容,这些内容也构成了在 ShardingSphere中 集成 Seata 框架的实现过程。

6 从源码到开发

本文给出应用程序咋集成 Seata 分布式事务框架的详细过程,ShardingSphere 提供一种模版实现。日常开发,若想在业务代码集成 Seata,可参考 SeataTransactionHolder、SeataATShardingTransactionManager 等核心代码,而无需太多修改。

7 总结

XAShardingTransactionManager理解难在从 ShardingConnection 到底层 JDBC 规范的整个集成和兼容过程。

8 集成Seata框架

参考 ShardingSphere 的实现:


1. 配置 Seata 环境

  • 配置文件准备: 创建 seata.conf 文件,定义 applicationIdtransactionServiceGroup 等参数。
  • 启动 Seata 服务: 启动 Seata Server 并确保其与数据库的事务协调机制正常工作。

2. 初始化 Seata 客户端

项目中初始化 TMClientRMClient,它们分别代表事务管理器和资源管理器:

FileConfiguration configuration = new FileConfiguration("seata.conf");
String applicationId = configuration.getConfig("client.application.id");
String transactionServiceGroup = configuration.getConfig("client.transaction.service.group", "default");
TMClient.init(applicationId, transactionServiceGroup);
RMClient.init(applicationId, transactionServiceGroup);

3. 数据源代理

构建 DataSourceProxy 使用 Seata 的 DataSourceProxy 对数据源进行代理。

DataSourceProxy dataSourceProxy = new DataSourceProxy(originalDataSource);

获取连接代理:从代理数据源中获取 ConnectionProxy,使每个数据库连接支持事务传播。

Connection connection = dataSourceProxy.getConnection();

4. 全局事务上下文管理

基于 GlobalTransactionContext 获取或创建事务对象:

GlobalTransaction transaction = GlobalTransactionContext.getCurrentOrCreate();

绑定全局事务 XID: 当事务发起时,将全局事务的 XID 存储在 RootContext 中:

RootContext.bind(transaction.getXid());

通过 RootContext 判断事务状态:

boolean isInTransaction = RootContext.inGlobalTransaction();

5. 事务操作实现

开启事务:

transaction.begin();

提交事务:

try {transaction.commit();
} finally {RootContext.unbind();
}

回滚事务:

try {transaction.rollback();
} finally {RootContext.unbind();
}

6. 整合业务逻辑

将分布式事务的核心逻辑封装在工具类中,例如 SeataTransactionHolder,以便方便地管理全局事务上下文:

SeataTransactionHolder.set(GlobalTransactionContext.getCurrentOrCreate());

7. 清理资源

在应用关闭时,清理客户端资源:

TmRpcClient.getInstance().destroy();
RmRpcClient.getInstance().destroy();

8. 注意事项

  • 确保所有数据源通过 DataSourceProxy 代理,避免事务管理失效。
  • 配置数据库支持 Undo Log 表,确保事务回滚记录正常存储。
  • 调试过程中,检查 Seata Server 日志和应用日志,定位事务协调的问题。

通过上述步骤,可以在业务代码中顺利集成 Seata,实现分布式事务管理,保障数据一致性。

关注我,紧跟本系列专栏文章,咱们下篇再续!

作者简介:魔都架构师,多家大厂后端一线研发经验,在分布式系统设计、数据平台架构和AI应用开发等领域都有丰富实践经验。

各大技术社区头部专家博主。具有丰富的引领团队经验,深厚业务架构和解决方案的积累。

负责:

  • 中央/分销预订系统性能优化
  • 活动&券等营销中台建设
  • 交易平台及数据中台等架构和开发设计
  • 车联网核心平台-物联网连接平台、大数据平台架构设计及优化
  • LLM Agent应用开发
  • 区块链应用开发
  • 大数据开发挖掘经验
  • 推荐系统项目

目前主攻市级软件项目设计、构建服务全社会的应用系统。

参考:

  • 编程严选网

本文由博客一文多发平台 OpenWrite 发布!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/834767.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

20222303 2024-2025-1 《网络与系统攻防技术》实验六实验报告

1.实验内容 1.1 本周学习内容回顾 使用了Metasploit框架,其是一个功能强大的渗透测试框架。在使用的过程当中,Metasploit 提供了种类繁多的攻击模块,涵盖了远程代码执行、服务拒绝、提权等多种攻击方式,支持对多种操作系统和应用程序进行测试。除了漏洞利用,它还具备强大的…

20222325 2024-2025-1 《网络与系统攻防技术》实验五实验报告

1.实验内容 1.从www.besti.edu.cn、baidu.com、sina.com.cn中选择一个DNS域名进行查询,获取如下信息: DNS注册人及联系方式;该域名对应IP地址;IP地址注册人及联系方式;IP地址所在国家、城市和具体地理位置。 PS:使用whois、dig、nslookup、traceroute、以及各类在线和离线…

识海社区打卡-6

这场掉回灰名了,本来必然上大分,原因无他-查寝 先来查波战绩:c题没过纯属是因为被查寝查了室友也是个不知变通的让查寝记我头上了,byd害我赶回去,本来这题必出 看看我最后一发提交错哪了 void solve() {int n;cin >> n;if (n % 2){if (n < 27){cout << -1 …

第七次高级程序语言设计作业

班级:https://edu.cnblogs.com/campus/fzu/2024C 作业要求:https://edu.cnblogs.com/campus/fzu/2024C/homework/13304 学号:102400115 姓名:洪育豪 7.1问题:无7.2问题:无7.3问题:无7.4问题:无 含义说明 int a; - 定义一个普通整型变量。 int a; - 定义一个指向整型变量…

工具“dotnet-cnblog”安装失败。此故障可能由以下原因导致

工具“dotnet-cnblog”安装失败。此故障可能由以下原因导致解决方法 我这边用的wifi 有问题,换到手机热点就好了。本文来自博客园,作者:lanwf,转载请注明原文链接:https://www.cnblogs.com/lccsdncnblogs/p/18549701

idea免费激活到2099年

idea免费激活到2099年 目录idea免费激活到2099年前言一去官网上下载idea二 下载idea激活工具并使用第一步点击scripts(uninstall-all-users.vbs)清除信息第二步点击(install.vbs)安装配置信息第三步打开(idea.vmoptions)拷贝配置信息到 idea配置中三激活idea1复制激活码 …

学期:2024-2025-1 学号:20241303 《 计算机基础与程序设计》第八周学习总结

作业信息这个作业属于哪个课程 <班级的链接>(如2024-2025-1-计算机基础与程序设计)这个作业要求在哪里 <作业要求的链接>(如2024-2025-1计算机基础与程序设计第八周作业)这个作业的目标 <写上具体方面> 计算机科学概论(第七版)第9章 并完成云班课测试,《…

DM multipath总结---基于LINUX 7

DM multipath总结---基于LINUX 7 DM multipath总结DM Multipath提供的功能: 冗余:DM Multipath 能够在主动/被动配置下提供故障转移。在主动/被动配置下,只有一半的路径在每次进行 I/O 时会被使用。若一条 I/O 路径的任一元素(电缆、交换器或者控制器)出现故障,DM Multi…

如果让你来设计网络

如果让你来设计网络 你是一台电脑,你的名字叫 A 很久很久之前,你不与任何其他电脑相连接,孤苦伶仃。 ​​ 直到有一天,你希望与另一台电脑 B 建立通信,于是你们各开了一个网口,用一根网线连接了起来。 ​​ 用一根网线连接起来怎么就能"通信"了呢?我可以给你讲…

Wincc 7.5SP1下VBA编程练习:批量设置看见权限

这一篇学习笔记我在新浪发表过,那边还在审核。在这里也记录一下。 前两天QQ群里面有人询问能不能快速的给WINCC画面上的控件设置操作权限,这个是比较容易的。比如有个画面有10个IO域,在VBA编辑器写下面的脚本:Sub IOField_PropertyTrigger1() Dim objects Dim obj Dim objd…

项目冲测6

项目冲测6这个作业属于哪个课程 计科12班这个作业的要求在哪里 项目冲测 一.团队简介 队名:菜鸟队姓名 学号 团队责任巴哈尔古丽吾甫尔 3222004679 协调团队工作,对作业任务进行整理分配,整理博客工具人努日曼姑丽阿卜来孜 3222004935 指定测试计划,对软件进行测试,漏洞整…

20222407 2024-2025-1 《网络与系统攻防技术》实验六实验报告

1.实验内容 1.1 本周学习内容回顾 Metasploit 是一个功能强大的渗透测试框架,广泛应用于网络安全领域。它为安全专家、渗透测试人员和红队提供了一个全面的工具集,支持漏洞利用、攻击模拟和安全评估。Metasploit 提供了丰富的攻击模块,涵盖了远程代码执行、服务拒绝、提权等…