文章目录
一、认识2PC - 两阶段提交
1、理论
理论性的东西,懒得再打一遍了,贴在这了:
分布式事务详解【分布式事务的几种解决方案】彻底搞懂分布式事务
关键的两张图:
下图展示了2PC的两个阶段,分成功和失败两个情况说明:
成功情况:
失败情况:
2、手撸XA-两阶段提交
(1)时序图
(2)代码实例
import com.mysql.cj.jdbc.JdbcConnection;
import com.mysql.cj.jdbc.MysqlXAConnection;
import com.mysql.cj.jdbc.MysqlXid;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;@SpringBootTest
public class MysqlXaTest {@Testpublic void testXa() {try {//获取员工库的连接以及资源管理器JdbcConnection employeeConnection = (JdbcConnection) DriverManager.getConnection("jdbc:mysql://localhost:3306/employee", "root", "rootroot");MysqlXAConnection employeeXAConnection = new MysqlXAConnection(employeeConnection, true);XAResource employeeXaResource = employeeXAConnection.getXAResource();//获取的员工薪资库的连接以及资源管理器JdbcConnection salaryConnection = (JdbcConnection) DriverManager.getConnection("jdbc:mysql://localhost:3306/salary", "root", "rootroot");MysqlXAConnection salaryXAConnection = new MysqlXAConnection(salaryConnection, true);XAResource salaryXaResource = salaryXAConnection.getXAResource();// 全局事务idbyte[] gtrid = "g00003".getBytes();// 分支事务idbyte[] bqual = "b00001".getBytes();// 标识,一般是个固定值int formatId = 1;//开启员工插入的分支事务Xid employeeXid = new MysqlXid(gtrid, bqual, formatId);employeeXaResource.start(employeeXid, XAResource.TMNOFLAGS);PreparedStatement preparedStatement = employeeConnection.prepareStatement("insert into employee (name, sex, level) values ('小10', '女', '7')");preparedStatement.execute();employeeXaResource.end(employeeXid, XAResource.TMSUCCESS);//开启员工薪资的分支事务byte[] salaryBqual = "b00002".getBytes();Xid salaryXid = new MysqlXid(gtrid, salaryBqual, formatId);salaryXaResource.start(salaryXid, XAResource.TMNOFLAGS);PreparedStatement salaryPreparedStatement = salaryConnection.prepareStatement("insert into employee_salary (employee_id, salary) values ('12', 7000)");salaryPreparedStatement.execute();salaryXaResource.end(salaryXid, XAResource.TMSUCCESS);//第一阶段-准备阶段int employeePrepareResult = employeeXaResource.prepare(employeeXid);int salaryPrepareResult = salaryXaResource.prepare(salaryXid);//第二阶段-根据准备阶段的结果。判断是要执行commit还是rollbackif (employeePrepareResult == XAResource.XA_OK && salaryPrepareResult == XAResource.XA_OK) {employeeXaResource.commit(employeeXid, false);salaryXaResource.commit(salaryXid, false);} else {employeeXaResource.rollback(employeeXid);salaryXaResource.rollback(salaryXid);}} catch (SQLException | XAException e) {throw new RuntimeException(e);}}
}
3、认识JTA
JTA(Java Transaction API):是Java平台上一个标准API,用于管理和控制分布式事务的执行流程。
核心类:
javax.transaction.UserTransaction
:暴露给应用使用,用来启动、提交、回滚事务。
javax.transaction.TransactionManager
:提供给事务管理器的接口,用于协调和控制分布式事务的执行过程。
javax.transaction.XAResource
:表示一个资源管理器,用于管理和操作资源。
javax.transaction.Xid
:用于唯一标识一个分布式事务。
4、今天的主角:Atomikos
Atomikos是一个开源的事务管理器,用于管理和控制分布式事务的执行过程。提供了一个可靠的、高性能的事务管理解决方案,可以与多种应用程序和数据库集成。
简单理解就是,Atomikos是可以集成在我们Java代码里面,和我们的业务代码绑定到同一个Java进程里面的一个事务管理器的框架,可以帮助我们业务程序去自行实现分布式事务。
Atomikos特点:支持分布式事务、支持多种web服务器、支持多种数据库、支持XA协议、提供高性能的事务管理。
Atomikos可以解决,在同一个应用下,连接多个数据库,实现分布式事务。
5、2PC存在的问题
1、TM单点问题。TM挂掉之后,无法回滚和提交。
2、资源锁定的问题。资源锁定之后,TM挂掉无法回滚和提交。
3、性能瓶颈。资源锁定时间长。
4、数据不一致问题。commit时成功状态不一致就会造成数据不一致。
二、Atomikos使用
1、依赖+配置
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-jta-atomikos</artifactId>
</dependency>
server.port=8080spring.employee-datasource.driverClassName = com.mysql.jdbc.Driver
spring.employee-datasource.jdbc-url = jdbc:mysql://localhost:3306/employee?useUnicode=true&characterEncoding=utf-8&serverTimezone=UTC
spring.employee-datasource.username = root
spring.employee-datasource.password = rootrootspring.salary-datasource.driverClassName = com.mysql.jdbc.Driver
spring.salary-datasource.jdbc-url = jdbc:mysql://localhost:3306/salary?useUnicode=true&characterEncoding=utf-8&serverTimezone=UTC
spring.salary-datasource.username = root
spring.salary-datasource.password = rootrootlogging.level.com.atomikos = debug
2、定义AtomikosDataSourceBean数据源
import com.atomikos.jdbc.AtomikosDataSourceBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import javax.sql.DataSource;
import java.util.Properties;@Configuration
public class AtomikosDataSourceConfig {@Value("${spring.employee-datasource.jdbc-url}")private String employeeUrl;@Value("${spring.employee-datasource.username}")private String employeeUser;@Value("${spring.employee-datasource.password}")private String employeePassword;@Value("${spring.salary-datasource.jdbc-url}")private String salaryUrl;@Value("${spring.salary-datasource.username}")private String salaryUser;@Value("${spring.salary-datasource.password}")private String salaryPassword;/*** 定义两个数据源,分别对应两个数据库*/@Bean(name = "employeeDataSource")public DataSource employeeDataSource(){AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean();atomikosDataSourceBean.setUniqueResourceName("employeeDataSource");atomikosDataSourceBean.setXaDataSourceClassName("com.mysql.cj.jdbc.MysqlXADataSource");Properties properties = new Properties();properties.setProperty("URL", employeeUrl);properties.setProperty("user", employeeUser);properties.setProperty("password", employeePassword);atomikosDataSourceBean.setXaProperties(properties);return atomikosDataSourceBean;}/*** 定义两个数据源,分别对应两个数据库*/@Bean(name = "salaryDataSource")public DataSource salaryDataSource(){AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean();atomikosDataSourceBean.setUniqueResourceName("salaryDataSource");atomikosDataSourceBean.setXaDataSourceClassName("com.mysql.cj.jdbc.MysqlXADataSource");Properties properties = new Properties();properties.setProperty("URL", salaryUrl);properties.setProperty("user", salaryUser);properties.setProperty("password", salaryPassword);atomikosDataSourceBean.setXaProperties(properties);return atomikosDataSourceBean;}
}
3、定义事务管理器JtaTransactionManager
import com.atomikos.icatch.jta.UserTransactionImp;
import com.atomikos.icatch.jta.UserTransactionManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.jta.JtaTransactionManager;import javax.transaction.TransactionManager;
import javax.transaction.UserTransaction;@Configuration
public class AtomikosConfig {// JTA的事务管理@Bean(name = "userTransaction")public UserTransaction userTransaction() {return new UserTransactionImp();}@Bean(name = "atomikosTransactionManager")public TransactionManager atomikosTransactionManager() {return new UserTransactionManager();}/*** 事务管理器*/@Bean(name = "platformTransactionManager")@DependsOn({"userTransaction", "atomikosTransactionManager"})public PlatformTransactionManager transactionManager() {UserTransaction userTransaction = userTransaction();TransactionManager transactionManager = atomikosTransactionManager();return new JtaTransactionManager(userTransaction, transactionManager);}
}
4、MyBatis配置
import lombok.SneakyThrows;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import javax.sql.DataSource;@Configuration
@MapperScan(basePackages = "com.example.distributetransaction.dao", sqlSessionFactoryRef = "sqlSessionFactoryEmployee")
public class EmployeeMybatisConfig {@SneakyThrows@Beanpublic SqlSessionFactory sqlSessionFactoryEmployee(@Qualifier("employeeDataSource") DataSource dataSource) throws Exception {SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean();factoryBean.setDataSource(dataSource);return factoryBean.getObject();}
}
import lombok.SneakyThrows;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import javax.sql.DataSource;@Configuration
@MapperScan(basePackages = "com.example.distributetransaction.dao1", sqlSessionFactoryRef = "sqlSessionFactorySalary")
public class SalaryMybatisConfig {@SneakyThrows@Beanpublic SqlSessionFactory sqlSessionFactorySalary(@Qualifier("salaryDataSource") DataSource dataSource) throws Exception {SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean();factoryBean.setDataSource(dataSource);return factoryBean.getObject();}
}
5、验证
@Transactional(rollbackFor = Exception.class)
public String join(EmployeeEntity employeeEntity) {//第一步,插入员工基础信息employeeDao.insertEmployee(employeeEntity);//第二步,插入员工薪资employeeSalaryDao.insertEmployeeSalary(employeeEntity.getId(), employeeEntity.getSalary());int i = 1 / 0;return "员工入职成功";
}
三、Atomikos源码分析
1、@Transactional入口:TransactionInterceptor创建事务流程
- (1)Spring事务入口:@Transactional
- (2)TransactionInterceptor#invoke:Spring事务的代理拦截方法
- (3)TransactionAspectSupport#determineTransactionManager:确定事务管理器=>我们创建的JtaTransactionManager
- (4)TransactionAspectSupport#createTransactionIfNecessary:创建事务
- (5)AbstractPlatformTransactionManager#getTransaction:获取事务
- (6)JtaTransactionManager#doGetTransaction:获取事务,拿到JtaTransactionObject,里面封装了UserTransactionImp
- (7)JtaTransactionManager获取我们配置的UserTransactionImp
JtaTransactionManager#doGetTransaction:获取事务
2、启动事务
- (1)从AbstractPlatformTransactionManager#handleExistingTransaction调用AbstractPlatformTransactionManager#startTransaction开启事务
- (2)调用JtaTransactionManager#doBegin开启事务
- (3)调用JtaTransactionManager#doJtaBegin开启事务
- (4)调用UserTransactionImp#begin开启事务
- (5)最终调用的是UserTransactionManager#begin开启事务
- (6)调用TransactionManagerImp#begin()开启事务
- (7)调用CompositeTransactionManagerImp#createCompositeTransaction创建分布式事务
3、小总结:启动全局事务流程图
4、分支事务,业务流程执行过程
5、事务提交与回滚