Atomikos详解:数据库XA规范与Atomikos使用与源码分析

文章目录

  • 一、认识2PC - 两阶段提交
    • 1、理论
    • 2、手撸XA-两阶段提交
      • (1)时序图
      • (2)代码实例
    • 3、认识JTA
    • 4、今天的主角:Atomikos
    • 5、2PC存在的问题
  • 二、Atomikos使用
    • 1、依赖+配置
    • 2、定义AtomikosDataSourceBean数据源
    • 3、定义事务管理器JtaTransactionManager
    • 4、MyBatis配置
    • 5、验证
  • 三、Atomikos源码分析
    • 1、@Transactional入口:TransactionInterceptor创建事务流程
    • 2、启动事务
    • 3、小总结:启动全局事务流程图
    • 4、分支事务,业务流程执行过程
    • 5、事务提交与回滚

一、认识2PC - 两阶段提交

1、理论

理论性的东西,懒得再打一遍了,贴在这了:
分布式事务详解【分布式事务的几种解决方案】彻底搞懂分布式事务

关键的两张图:
下图展示了2PC的两个阶段,分成功和失败两个情况说明:
成功情况:
在这里插入图片描述

失败情况:
在这里插入图片描述

2、手撸XA-两阶段提交

(1)时序图

在这里插入图片描述

(2)代码实例

import com.mysql.cj.jdbc.JdbcConnection;
import com.mysql.cj.jdbc.MysqlXAConnection;
import com.mysql.cj.jdbc.MysqlXid;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;@SpringBootTest
public class MysqlXaTest {@Testpublic void testXa() {try {//获取员工库的连接以及资源管理器JdbcConnection employeeConnection = (JdbcConnection) DriverManager.getConnection("jdbc:mysql://localhost:3306/employee", "root", "rootroot");MysqlXAConnection employeeXAConnection = new MysqlXAConnection(employeeConnection, true);XAResource employeeXaResource = employeeXAConnection.getXAResource();//获取的员工薪资库的连接以及资源管理器JdbcConnection salaryConnection = (JdbcConnection) DriverManager.getConnection("jdbc:mysql://localhost:3306/salary", "root", "rootroot");MysqlXAConnection salaryXAConnection = new MysqlXAConnection(salaryConnection, true);XAResource salaryXaResource = salaryXAConnection.getXAResource();// 全局事务idbyte[] gtrid = "g00003".getBytes();// 分支事务idbyte[] bqual = "b00001".getBytes();// 标识,一般是个固定值int formatId = 1;//开启员工插入的分支事务Xid employeeXid = new MysqlXid(gtrid, bqual, formatId);employeeXaResource.start(employeeXid, XAResource.TMNOFLAGS);PreparedStatement preparedStatement = employeeConnection.prepareStatement("insert into employee (name, sex, level) values ('小10', '女', '7')");preparedStatement.execute();employeeXaResource.end(employeeXid, XAResource.TMSUCCESS);//开启员工薪资的分支事务byte[] salaryBqual = "b00002".getBytes();Xid salaryXid = new MysqlXid(gtrid, salaryBqual, formatId);salaryXaResource.start(salaryXid, XAResource.TMNOFLAGS);PreparedStatement salaryPreparedStatement = salaryConnection.prepareStatement("insert into employee_salary (employee_id, salary) values ('12', 7000)");salaryPreparedStatement.execute();salaryXaResource.end(salaryXid, XAResource.TMSUCCESS);//第一阶段-准备阶段int employeePrepareResult = employeeXaResource.prepare(employeeXid);int salaryPrepareResult = salaryXaResource.prepare(salaryXid);//第二阶段-根据准备阶段的结果。判断是要执行commit还是rollbackif (employeePrepareResult == XAResource.XA_OK && salaryPrepareResult == XAResource.XA_OK) {employeeXaResource.commit(employeeXid, false);salaryXaResource.commit(salaryXid, false);} else {employeeXaResource.rollback(employeeXid);salaryXaResource.rollback(salaryXid);}} catch (SQLException | XAException e) {throw new RuntimeException(e);}}
}

3、认识JTA

JTA(Java Transaction API):是Java平台上一个标准API,用于管理和控制分布式事务的执行流程。

核心类:
javax.transaction.UserTransaction:暴露给应用使用,用来启动、提交、回滚事务。
javax.transaction.TransactionManager:提供给事务管理器的接口,用于协调和控制分布式事务的执行过程。
javax.transaction.XAResource:表示一个资源管理器,用于管理和操作资源。
javax.transaction.Xid:用于唯一标识一个分布式事务。

4、今天的主角:Atomikos

Atomikos是一个开源的事务管理器,用于管理和控制分布式事务的执行过程。提供了一个可靠的、高性能的事务管理解决方案,可以与多种应用程序和数据库集成。

简单理解就是,Atomikos是可以集成在我们Java代码里面,和我们的业务代码绑定到同一个Java进程里面的一个事务管理器的框架,可以帮助我们业务程序去自行实现分布式事务。

Atomikos特点:支持分布式事务、支持多种web服务器、支持多种数据库、支持XA协议、提供高性能的事务管理。

Atomikos可以解决,在同一个应用下,连接多个数据库,实现分布式事务。

5、2PC存在的问题

1、TM单点问题。TM挂掉之后,无法回滚和提交。
2、资源锁定的问题。资源锁定之后,TM挂掉无法回滚和提交。
3、性能瓶颈。资源锁定时间长。
4、数据不一致问题。commit时成功状态不一致就会造成数据不一致。

在这里插入图片描述

二、Atomikos使用

1、依赖+配置

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-jta-atomikos</artifactId>
</dependency>
server.port=8080spring.employee-datasource.driverClassName = com.mysql.jdbc.Driver
spring.employee-datasource.jdbc-url = jdbc:mysql://localhost:3306/employee?useUnicode=true&characterEncoding=utf-8&serverTimezone=UTC
spring.employee-datasource.username = root
spring.employee-datasource.password = rootrootspring.salary-datasource.driverClassName = com.mysql.jdbc.Driver
spring.salary-datasource.jdbc-url = jdbc:mysql://localhost:3306/salary?useUnicode=true&characterEncoding=utf-8&serverTimezone=UTC
spring.salary-datasource.username = root
spring.salary-datasource.password = rootrootlogging.level.com.atomikos = debug

2、定义AtomikosDataSourceBean数据源

import com.atomikos.jdbc.AtomikosDataSourceBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import javax.sql.DataSource;
import java.util.Properties;@Configuration
public class AtomikosDataSourceConfig {@Value("${spring.employee-datasource.jdbc-url}")private String employeeUrl;@Value("${spring.employee-datasource.username}")private String employeeUser;@Value("${spring.employee-datasource.password}")private String employeePassword;@Value("${spring.salary-datasource.jdbc-url}")private String salaryUrl;@Value("${spring.salary-datasource.username}")private String salaryUser;@Value("${spring.salary-datasource.password}")private String salaryPassword;/*** 定义两个数据源,分别对应两个数据库*/@Bean(name = "employeeDataSource")public DataSource employeeDataSource(){AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean();atomikosDataSourceBean.setUniqueResourceName("employeeDataSource");atomikosDataSourceBean.setXaDataSourceClassName("com.mysql.cj.jdbc.MysqlXADataSource");Properties properties = new Properties();properties.setProperty("URL", employeeUrl);properties.setProperty("user", employeeUser);properties.setProperty("password", employeePassword);atomikosDataSourceBean.setXaProperties(properties);return atomikosDataSourceBean;}/*** 定义两个数据源,分别对应两个数据库*/@Bean(name = "salaryDataSource")public DataSource salaryDataSource(){AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean();atomikosDataSourceBean.setUniqueResourceName("salaryDataSource");atomikosDataSourceBean.setXaDataSourceClassName("com.mysql.cj.jdbc.MysqlXADataSource");Properties properties = new Properties();properties.setProperty("URL", salaryUrl);properties.setProperty("user", salaryUser);properties.setProperty("password", salaryPassword);atomikosDataSourceBean.setXaProperties(properties);return atomikosDataSourceBean;}
}

3、定义事务管理器JtaTransactionManager

import com.atomikos.icatch.jta.UserTransactionImp;
import com.atomikos.icatch.jta.UserTransactionManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.jta.JtaTransactionManager;import javax.transaction.TransactionManager;
import javax.transaction.UserTransaction;@Configuration
public class AtomikosConfig {// JTA的事务管理@Bean(name = "userTransaction")public UserTransaction userTransaction() {return new UserTransactionImp();}@Bean(name = "atomikosTransactionManager")public TransactionManager atomikosTransactionManager() {return new UserTransactionManager();}/*** 事务管理器*/@Bean(name = "platformTransactionManager")@DependsOn({"userTransaction", "atomikosTransactionManager"})public PlatformTransactionManager transactionManager() {UserTransaction userTransaction = userTransaction();TransactionManager transactionManager = atomikosTransactionManager();return new JtaTransactionManager(userTransaction, transactionManager);}
}

4、MyBatis配置

import lombok.SneakyThrows;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import javax.sql.DataSource;@Configuration
@MapperScan(basePackages = "com.example.distributetransaction.dao", sqlSessionFactoryRef = "sqlSessionFactoryEmployee")
public class EmployeeMybatisConfig {@SneakyThrows@Beanpublic SqlSessionFactory sqlSessionFactoryEmployee(@Qualifier("employeeDataSource") DataSource dataSource) throws Exception {SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean();factoryBean.setDataSource(dataSource);return factoryBean.getObject();}
}
import lombok.SneakyThrows;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import javax.sql.DataSource;@Configuration
@MapperScan(basePackages = "com.example.distributetransaction.dao1", sqlSessionFactoryRef = "sqlSessionFactorySalary")
public class SalaryMybatisConfig {@SneakyThrows@Beanpublic SqlSessionFactory sqlSessionFactorySalary(@Qualifier("salaryDataSource") DataSource dataSource) throws Exception {SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean();factoryBean.setDataSource(dataSource);return factoryBean.getObject();}
}

5、验证

@Transactional(rollbackFor = Exception.class)
public String join(EmployeeEntity employeeEntity) {//第一步,插入员工基础信息employeeDao.insertEmployee(employeeEntity);//第二步,插入员工薪资employeeSalaryDao.insertEmployeeSalary(employeeEntity.getId(), employeeEntity.getSalary());int i = 1 / 0;return "员工入职成功";
}

三、Atomikos源码分析

1、@Transactional入口:TransactionInterceptor创建事务流程

  • (1)Spring事务入口:@Transactional
  • (2)TransactionInterceptor#invoke:Spring事务的代理拦截方法
  • (3)TransactionAspectSupport#determineTransactionManager:确定事务管理器=>我们创建的JtaTransactionManager
  • (4)TransactionAspectSupport#createTransactionIfNecessary:创建事务
  • (5)AbstractPlatformTransactionManager#getTransaction:获取事务
  • (6)JtaTransactionManager#doGetTransaction:获取事务,拿到JtaTransactionObject,里面封装了UserTransactionImp
  • (7)JtaTransactionManager获取我们配置的UserTransactionImp
    在这里插入图片描述
    在这里插入图片描述

在这里插入图片描述
JtaTransactionManager#doGetTransaction:获取事务
在这里插入图片描述
在这里插入图片描述

2、启动事务

  • (1)从AbstractPlatformTransactionManager#handleExistingTransaction调用AbstractPlatformTransactionManager#startTransaction开启事务
  • (2)调用JtaTransactionManager#doBegin开启事务
  • (3)调用JtaTransactionManager#doJtaBegin开启事务
  • (4)调用UserTransactionImp#begin开启事务
  • (5)最终调用的是UserTransactionManager#begin开启事务
  • (6)调用TransactionManagerImp#begin()开启事务
  • (7)调用CompositeTransactionManagerImp#createCompositeTransaction创建分布式事务
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

3、小总结:启动全局事务流程图

在这里插入图片描述

4、分支事务,业务流程执行过程

在这里插入图片描述

5、事务提交与回滚

在这里插入图片描述
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/518164.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

设计模式之模版方法实践

模版方法实践案例 实践之前还是先了解一下模版方法的定义 定义 模板方法模式是一种行为设计模式&#xff0c;它定义了一个骨架&#xff0c;并允许子类在不改变结构的情况下重写的特定步骤。模板方法模式通过在父类中定义一个模板方法&#xff0c;其中包含了主要步骤&#xf…

Batch Nomalization 迁移学习

Batch Nomalization 1.Batch Nomalization原理 图像预处理过程中通常会对图像进行标准化处理&#xff0c;这样能够加速网络的收敛。就是按照channel去求均值和方差&#xff0c;然后原数据减均值除标准差&#xff0c;使我们的feature map满足均值为0&#xff0c;方差为1的分布…

【教程】HBuilderX开发实践:隐私合规检测问题解决方案

文章目录 摘要引言正文1、违规收集个人信息2、APP强制、频繁、过度索取权限 知识点补充总结 摘要 本篇博客介绍了在使用HBuilderX进行开发过程中&#xff0c;常遇到的隐私合规问题&#xff0c;并提供了相应的解决方案。主要包括违规收集个人信息和APP强制、频繁、过度索取权限…

C#知识点-21(初识数据库)

数据库与内存、文件的比较 内存&#xff1a; 优点&#xff1a;存取速度快 缺点&#xff1a;-容量小 -断电后&#xff0c;数据不会保存 文件&#xff1a; 优点&#xff1a;数据可以持久化保存 缺点&#xff1a;-读取速度慢…

3款让人难以置信的软件,纯国产,真实用

闲话休提&#xff0c;直上狠货。 1、知犀思维导图 知犀思维导图是一款国产的优质思维导图工具&#xff0c;它能够帮助你捕捉每一个灵感瞬间&#xff0c;界面简洁易用&#xff0c;支持多人协作编辑&#xff0c;无论是理清思路、记录灵感、制定计划还是做笔记&#xff0c;都能轻…

Linux中线程的实现,线程的接口相关函数pthread_create、pthread_join、pthread_exit

目录 一.线程的概念 二.操作系统中线程的实现 三.Linux中线程的实现 四.进程与线程的区别 五.线程的接口相关函数 5.1 pthread_create 5.2 pthread_join 5.3 pthread_exit 六.代码演示 七.如何解决上述问题&#xff1f; 方案1. 方案2. 方案3. 一.线程的概念 进程是…

spring-data-elasticsearch官方文档解读(部分)

Spring Data Elasticsearch 这里主要学习的是4.4.16版本的文档 1. 版本 下表显示了 Spring Data 发行版系列使用的 Elasticsearch 版本和其中包含的 Spring Data Elasticsearch 版本&#xff0c;以及引用该特定 Spring Data 发行版系列的 Spring Boot 版本。给出的 Elastics…

数据结构——lesson6二叉树基础

前言 hellohello~这里是土土数据结构学习笔记&#x1f973;&#x1f973; &#x1f4a5;个人主页&#xff1a;大耳朵土土垚的博客 &#x1f4a5; 所属专栏&#xff1a;数据结构学习笔记 &#x1f4a5;对于数据结构顺序表链表有疑问的都可以在上面数据结构的专栏进行学习哦~感…

优思学院|使用完全数据计算CPK需要分子组吗?

使用全部数据进行计算&#xff0c;那么这种计算更类似于评估过程的PPK&#xff0c;PPK与计算CPK是不一样的&#xff0c;因为当数据以子组形式收集时&#xff0c;可以很容易地根据每个子组的范围或子组标准差来计算各个子组内的变异性。这是因为每个子组包含多个数据点&#xff…

你所需要的是 Wide Events,而不是 “Metrics、Logs 和 Traces”

原文[0] &#xff1a;Ivan Burmistrov - 2024.02.15 这段引自 Charity Majors 的话&#xff0c;或许是对当前科技行业可观测性状况的最佳概括——一个全面的、大规模的混乱。每个人都感到困惑&#xff0c;什么是 trace&#xff1f;什么是 span&#xff1f;一条 log 是 span 吗…

【C++庖丁解牛】模版初阶

&#x1f4d9; 作者简介 &#xff1a;RO-BERRY &#x1f4d7; 学习方向&#xff1a;致力于C、C、数据结构、TCP/IP、数据库等等一系列知识 &#x1f4d2; 日后方向 : 偏向于CPP开发以及大数据方向&#xff0c;欢迎各位关注&#xff0c;谢谢各位的支持 目录 1. 泛型编程2. 函数模…

Python Module level import not at top of file (E402)

Python Module level import not at top of file 引言正文 引言 这里给大家简单介绍一下当我们使用 Pycharm 编译器时遇到的 Python Module level import not at top of file 提醒。 正文 请看下图&#xff1a; 这时就会提示我们这个信息&#xff0c;并且 import 下面会出…