原理
TM两阶段:
阶段1:TM向TC申请全局事务,netty客户端发起了一次记录xid的请求
阶段2:TC协调之后,决定执行RM是否提交或者回滚。
spring公共组件部分
1、SeataAutoConfiguration类
利用springboot自动装配机制从spring.factories文件加载自动配置类SeataAutoConfiguration
org.springframework.boot.autoconfigure.EnableAutoConfiguration=io.seata.spring.boot.autoconfigure.SeataAutoConfiguration@ConditionalOnProperty(prefix = SEATA_PREFIX, name = "enabled", havingValue = "true", matchIfMissing = true)
@AutoConfigureAfter({SeataCoreAutoConfiguration.class})
public class SeataAutoConfiguration {private static final Logger LOGGER = LoggerFactory.getLogger(SeataAutoConfiguration.class);@Bean(BEAN_NAME_FAILURE_HANDLER)@ConditionalOnMissingBean(FailureHandler.class)public FailureHandler failureHandler() {return new DefaultFailureHandlerImpl();}@Bean@DependsOn({BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER, BEAN_NAME_FAILURE_HANDLER})@ConditionalOnMissingBean(GlobalTransactionScanner.class)public GlobalTransactionScanner globalTransactionScanner(SeataProperties seataProperties, FailureHandler failureHandler,ConfigurableListableBeanFactory beanFactory,@Autowired(required = false) List<ScannerChecker> scannerCheckers) {if (LOGGER.isInfoEnabled()) {LOGGER.info("Automatically configure Seata");}// set bean factoryGlobalTransactionScanner.setBeanFactory(beanFactory);// add checkers// '/META-INF/services/io.seata.spring.annotation.ScannerChecker'GlobalTransactionScanner.addScannerCheckers(EnhancedServiceLoader.loadAll(ScannerChecker.class));// spring beansGlobalTransactionScanner.addScannerCheckers(scannerCheckers);// add scannable packagesGlobalTransactionScanner.addScannablePackages(seataProperties.getScanPackages());// add excludeBeanNamesGlobalTransactionScanner.addScannerExcludeBeanNames(seataProperties.getExcludesForScanning());// create global transaction scannerreturn new GlobalTransactionScanner(seataProperties.getApplicationId(), seataProperties.getTxServiceGroup(), failureHandler);}
}
2、 GlobalTransactionScanner类
public class GlobalTransactionScanner extends AbstractAutoProxyCreatorimplements ConfigurationChangeListener, InitializingBean, ApplicationContextAware, DisposableBean
初始化TM、RM的netty客户端,提供aop能力
2-1、初始化TM、RM的netty客户端
io.seata.spring.annotation.GlobalTransactionScanner#afterPropertiesSet
spring-bean初始化赋值回调,这里初始化TM、RM的netty客户端,为后续发起请求做准备。
@Overridepublic void afterPropertiesSet() {if (disableGlobalTransaction) {if (LOGGER.isInfoEnabled()) {LOGGER.info("Global transaction is disabled.");}ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,(ConfigurationChangeListener)this);return;}if (initialized.compareAndSet(false, true)) {initClient();}}private void initClient() {if (LOGGER.isInfoEnabled()) {LOGGER.info("Initializing Global Transaction Clients ... ");}if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) {throw new IllegalArgumentException(String.format("applicationId: %s, txServiceGroup: %s", applicationId, txServiceGroup));}//init TMTMClient.init(applicationId, txServiceGroup, accessKey, secretKey);if (LOGGER.isInfoEnabled()) {LOGGER.info("Transaction Manager Client is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);}//init RMRMClient.init(applicationId, txServiceGroup);if (LOGGER.isInfoEnabled()) {LOGGER.info("Resource Manager is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);}if (LOGGER.isInfoEnabled()) {LOGGER.info("Global Transaction Clients are initialized. ");}registerSpringShutdownHook();}
2-2、aop代理实现
io.seata.spring.annotation.GlobalTransactionScanner#wrapIfNecessary,递归调用适配代理,Seata重写了这个方法,TCC代理、AT代理使用的还是spring那一套代理区分方式(jdk还是cglib)
/*** The following will be scanned, and added corresponding interceptor:** TM:* @see io.seata.spring.annotation.GlobalTransactional // TM annotation* Corresponding interceptor:* @see io.seata.spring.annotation.GlobalTransactionalInterceptor#handleGlobalTransaction(MethodInvocation, AspectTransactional) // TM handler** GlobalLock:* @see io.seata.spring.annotation.GlobalLock // GlobalLock annotation* Corresponding interceptor:* @see io.seata.spring.annotation.GlobalTransactionalInterceptor#handleGlobalLock(MethodInvocation, GlobalLock) // GlobalLock handler** TCC mode:* @see io.seata.rm.tcc.api.LocalTCC // TCC annotation on interface* @see io.seata.rm.tcc.api.TwoPhaseBusinessAction // TCC annotation on try method* @see io.seata.rm.tcc.remoting.RemotingParser // Remote TCC service parser* Corresponding interceptor:* @see io.seata.spring.tcc.TccActionInterceptor // the interceptor of TCC mode*/@Overrideprotected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {// do checkersif (!doCheckers(bean, beanName)) {return bean;}try {synchronized (PROXYED_SET) {if (PROXYED_SET.contains(beanName)) {return bean;}interceptor = null;//check TCC proxyif (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {// init tcc fence clean task if enable useTccFenceTCCBeanParserUtils.initTccFenceCleanTask(TCCBeanParserUtils.getRemotingDesc(beanName), applicationContext);//TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCCinterceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,(ConfigurationChangeListener)interceptor);} else {Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);if (!existsAnnotation(new Class[]{serviceInterface})&& !existsAnnotation(interfacesIfJdk)) {return bean;}if (globalTransactionalInterceptor == null) {globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,(ConfigurationChangeListener)globalTransactionalInterceptor);}interceptor = globalTransactionalInterceptor;}LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", bean.getClass().getName(), beanName, interceptor.getClass().getName());if (!AopUtils.isAopProxy(bean)) {bean = super.wrapIfNecessary(bean, beanName, cacheKey);} else {AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));int pos;for (Advisor avr : advisor) {// Find the position based on the advisor's order, and add to advisors by pospos = findAddSeataAdvisorPosition(advised, avr);advised.addAdvisor(pos, avr);}}PROXYED_SET.add(beanName);return bean;}} catch (Exception exx) {throw new RuntimeException(exx);}}
3、GlobalTransactionalInterceptor类