您现在的位置是:亿华云 > 应用开发

分布式事务(Seata)原理详解篇

亿华云2025-10-03 07:02:09【应用开发】9人已围观

简介今天这篇,就给大家分析一下Seata的源码是如何一步一步实现的。读源码的时候我们需要俯瞰起全貌,不要去扣一个一个的细节,这样我们学习起来会快捷而且有效率,我们学习源码需要掌握的是整体思路和核心点。首先

今天这篇,分布就给大家分析一下Seata的式事源码是如何一步一步实现的。读源码的原理时候我们需要俯瞰起全貌,不要去扣一个一个的详解细节,这样我们学习起来会快捷而且有效率,分布我们学习源码需要掌握的式事是整体思路和核心点。

首先 Seata 客户端启动一般分为以下几个流程:

自动加载Bean属性和配置信息。原理初始化TM。详解初始化RM。分布初始化分布式事务客户端完成,式事完成代理数据库配置。原理连接TC(Seata服务端),详解注册RM和TM。分布开启全局事务。式事

在这篇源码的原理讲解中,我们主要以AT模式为主导,官网也是主推AT模式,我们在上篇的文章中也讲解过,感兴趣的小伙伴可以去看一看​​分布式事务(Seata) 四大模式详解​​,在官网中也提供了对应的流程地址:https://seata.io/zh-cn/docs/dev/mode/at-mode.html ,在这里我们只是做一些简单的亿华云介绍,AT模式主要分为两个阶段:

一阶段:解析SQL,获取SQL类型(CRUD)、表信息、条件(where) 等相关信息。查询前镜像(改变之前的数据),根据解析得到的条件信息,生成查询语句,定位数据。执行业务SQL,更新数据。查询后镜像(改变后的数据),根据前镜像的结果,通过主键都给你为数据。插入回滚日志,将前后镜像数据以及业务SQL等信息,组织成一条回滚日志记录,插入到undo Log表中。提交前,向TC注册分支,申请全局锁。本地事务提交,业务数据的更细腻和生成的undoLog一起提交。将本地事务提交的结果通知给TC。二阶段:

如果TC收到的是网站模板回滚请求:

开启本地事务,通过XID和BranchID查找到对应的undo Log记录。根据undoLog中的前镜像和业务SQL的相关信息生成并执行回滚语句。提交本地事务,将本地事务的执行结果(分支事务回滚的信息)通知给TC。

如果没问题,执行提交操作:

收到TC分支提交请求,将请求放入到一个异步任务的队列中,马上返回提交成功的结果给TC。异步任务阶段的分支提交请求删除undoLog中记录。

源码入口

接下来,我们就需要从官网中去下载源码,下载地址:https://seata.io/zh-cn/blog/download.html,选择 source 即可,下载完成之后,通过IDEA打开项目。

源码下载下来之后,我们应该如何去找入口呢?首先我们需要找到对应引入的 Seata 包 spring-alibaba-seata,我们在回想一下,我们开启事务的时候,是不是添加过一个@GlobalTransactional的源码库注解,这个注解就是我们入手的一个点,我们在 spring.factories 中看到有一个 GlobalTransactionAutoConfiguration,这个就是我们需要关注的点,也就是我们源码的入口。

在 GlobalTransactionAutoConfiguration 中我们找到一个用Bean注入的方法 globalTransactionScanner ,这个就是全局事务扫描器,这个类型主要负责加载配置,注入相关的Bean。

这里给大家展示了当前GlobalTransactionScanner的类关系图,其中我们现在继承了Aop的AbstractAutoProxyCreator类型,在这其中有一个重点方法,这个方法就是判断Bean对象是否需要代理,是否需要增强。

@Configuration

@EnableConfigurationProperties(SeataProperties.class)

public class GlobalTransactionAutoConfiguration {

//全局事务扫描器

@Bean

public GlobalTransactionScanner globalTransactionScanner() {

String applicationName = applicationContext.getEnvironment()

.getProperty("spring.application.name");

String txServiceGroup = seataProperties.getTxServiceGroup();

if (StringUtils.isEmpty(txServiceGroup)) {

txServiceGroup = applicationName + "-fescar-service-group";

seataProperties.setTxServiceGroup(txServiceGroup);

}

// 构建全局扫描器,传入参数:应用名、事务分组名,失败处理器

return new GlobalTransactionScanner(applicationName, txServiceGroup);

}

}

在这其中我们要关心的是 GlobalTransactionScanner 这个类型,这个类型扫描 @GlobalTransactional 注解,并对代理方法进行拦截增强事务的功能。我们就从源码中搜索这个GlobalTransactionScanner类,看看里面具体是做了什么。

/

**

* The type Global transaction scanner.

* 全局事务扫描器

* @author slievrly

*/

public class GlobalTransactionScanner

//AbstractAutoProxyCreator AOP动态代理 增强Bean

extends AbstractAutoProxyCreator

/

**

* ConfigurationChangeListener: 监听器基准接口

* InitializingBean: Bean初始化

* ApplicationContextAware: Spring容器

* DisposableBean: Spring 容器销毁

*/

implements ConfigurationChangeListener, InitializingBean, ApplicationContextAware, DisposableBean {

private final String applicationId;//服务名

private final String txServiceGroup;//事务分组

private void initClient() {

//启动日志

if (LOGGER.isInfoEnabled()) {

LOGGER.info("Initializing Global Transaction Clients ... ");

}

//检查应用名以及事务分组名,为空抛出异常IllegalArgumentException

if (DEFAULT_TX_GROUP_OLD.equals(txServiceGroup)) {

LOGGER.warn("the default value of seata.tx-service-group: { } has already changed to { } since Seata 1.5, " +

"please change your default configuration as soon as possible " +

"and we dont recommend you to use default tx-service-groups value provided by seata",

DEFAULT_TX_GROUP_OLD, DEFAULT_TX_GROUP);

}

if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) {

throw new IllegalArgumentException(String.format("applicationId: %s, txServiceGroup: %s", applicationId, txServiceGroup));

}

//init TM

//初始化TM

TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);

if (LOGGER.isInfoEnabled()) {

LOGGER.info("Transaction Manager Client is initialized. applicationId[{ }] txServiceGroup[{ }]", applicationId, txServiceGroup);

}

//init RM

//初始化RM

RMClient.init(applicationId, txServiceGroup);

if (LOGGER.isInfoEnabled()) {

LOGGER.info("Resource Manager is initialized. applicationId[{ }] txServiceGroup[{ }]", applicationId, txServiceGroup);

}

if (LOGGER.isInfoEnabled()) {

LOGGER.info("Global Transaction Clients are initialized. ");

}

registerSpringShutdownHook();

}

@Override

public void afterPropertiesSet() {

if (disableGlobalTransaction) {

if (LOGGER.isInfoEnabled()) {

LOGGER.info("Global transaction is disabled.");

}

ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,

(ConfigurationChangeListener)this);

return;

}

if (initialized.compareAndSet(false, true)) {

initClient();

}

}

private void initClient() {

//启动日志

if (LOGGER.isInfoEnabled()) {

LOGGER.info("Initializing Global Transaction Clients ... ");

}

//检查应用名以及事务分组名,为空抛出异常IllegalArgumentException

if (DEFAULT_TX_GROUP_OLD.equals(txServiceGroup)) {

LOGGER.warn("the default value of seata.tx-service-group: { } has already changed to { } since Seata 1.5, " +

"please change your default configuration as soon as possible " +

"and we dont recommend you to use default tx-service-groups value provided by seata",

DEFAULT_TX_GROUP_OLD, DEFAULT_TX_GROUP);

}

//检查应用名以及事务分组名,为空抛出异常IllegalArgumentException

if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) {

throw new IllegalArgumentException(String.format("applicationId: %s, txServiceGroup: %s", applicationId, txServiceGroup));

}

//init TM

//初始化TM

TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);

if (LOGGER.isInfoEnabled()) {

LOGGER.info("Transaction Manager Client is initialized. applicationId[{ }] txServiceGroup[{ }]", applicationId, txServiceGroup);

}

//init RM

//初始化RM

RMClient.init(applicationId, txServiceGroup);

if (LOGGER.isInfoEnabled()) {

LOGGER.info("Resource Manager is initialized. applicationId[{ }] txServiceGroup[{ }]", applicationId, txServiceGroup);

}

if (LOGGER.isInfoEnabled()) {

LOGGER.info("Global Transaction Clients are initialized. ");

}

registerSpringShutdownHook();

}

//代理增强,Spring 所有的Bean都会经过这个方法

@Override

protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {

// do checkers

//检查bean和beanName

if (!doCheckers(bean, beanName)) {

return bean;

}

try {

//加锁防止并发

synchronized (PROXYED_SET) {

if (PROXYED_SET.contains(beanName)) {

return bean;

}

interceptor = null;

//check TCC proxy

//检查是否为TCC模式

if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {

// init tcc fence clean task if enable useTccFence

//如果启用useTccFence 失败 ,则初始化TCC清理任务

TCCBeanParserUtils.initTccFenceCleanTask(TCCBeanParserUtils.getRemotingDesc(beanName), applicationContext);

//TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC

//如果是,添加TCC拦截器

interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));

ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,

(ConfigurationChangeListener)interceptor);

} else {

//不是TCC

Class serviceInterface = SpringProxyUtils.findTargetClass(bean);

Class [] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);

//判断是否有相关事务注解,如果没有不进行代理

if (!existsAnnotation(new Class[]{ serviceInterface})

&& !existsAnnotation(interfacesIfJdk)) {

return bean;

}

//发现存在全局事务注解标注的Bean对象,添加拦截器

if (globalTransactionalInterceptor == null) {

//添加拦截器

globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);

ConfigurationCache.addConfigListener(

ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,

(ConfigurationChangeListener)globalTransactionalInterceptor);

}

interceptor = globalTransactionalInterceptor;

}

LOGGER.info("Bean[{ }] with name [{ }] would use interceptor [{ }]", bean.getClass().getName(), beanName, interceptor.getClass().getName());

//检查是否为代理对象

if (!AopUtils.isAopProxy(bean)) {

//不是代理对象,调用父级

bean = super.wrapIfNecessary(bean, beanName, cacheKey);

} else {

//是代理对象,反射获取代理类中已经存在的拦截器组合,然后添加到这个集合中

AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);

Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));

int pos;

for (Advisor avr : advisor) {

// Find the position based on the advisors order, and add to advisors by pos

pos = findAddSeataAdvisorPosition(advised, avr);

advised.addAdvisor(pos, avr);

}

}

PROXYED_SET.add(beanName);

return bean;

}

} catch (Exception exx) {

throw new RuntimeException(exx);

}

}

}

InitializingBean:中实现了一个 afterPropertiesSet()方法,在这个方法中,调用了initClient()。

AbstractAutoProxyCreator:APO动态代理,在之前的的Nacos和Sentiel中都有这个代理类,AOP在我们越往深入学习,在学习源码的会见到的越来越多,越来越重要,很多相关代理,都是通过AOP进行增强,在这个类中,我们需要关注有一个wrapIfNecessary()方法, 这个方法主要是判断被代理的bean或者类是否需要代理增强,在这个方法中会调用GlobalTransactionalInterceptor.invoke()进行带来增强。

具体代码如下:

public class GlobalTransactionalInterceptor implements ConfigurationChangeListener, MethodInterceptor, SeataInterceptor {

public GlobalTransactionalInterceptor(FailureHandler failureHandler) {

this.failureHandler = failureHandler == null ? DEFAULT_FAIL_HANDLER : failureHandler;

this.disable = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,

DEFAULT_DISABLE_GLOBAL_TRANSACTION);

this.order =

ConfigurationFactory.getInstance().getInt(ConfigurationKeys.TM_INTERCEPTOR_ORDER, TM_INTERCEPTOR_ORDER);

degradeCheck = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.CLIENT_DEGRADE_CHECK,

DEFAULT_TM_DEGRADE_CHECK);

if (degradeCheck) {

ConfigurationCache.addConfigListener(ConfigurationKeys.CLIENT_DEGRADE_CHECK, this);

degradeCheckPeriod = ConfigurationFactory.getInstance()

.getInt(ConfigurationKeys.CLIENT_DEGRADE_CHECK_PERIOD, DEFAULT_TM_DEGRADE_CHECK_PERIOD);

degradeCheckAllowTimes = ConfigurationFactory.getInstance()

.getInt(ConfigurationKeys.CLIENT_DEGRADE_CHECK_ALLOW_TIMES, DEFAULT_TM_DEGRADE_CHECK_ALLOW_TIMES);

EVENT_BUS.register(this);

if (degradeCheckPeriod > 0 && degradeCheckAllowTimes > 0) {

startDegradeCheck();

}

}

this.initDefaultGlobalTransactionTimeout();

}

@Override

public Object invoke(final MethodInvocation methodInvocation) throws Throwable {

//获取执行的方法

Class targetClass =

methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;

Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);

if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {

final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);

//获取GlobalTransactional(全局事务)、GlobalLock(全局锁)元数据

final GlobalTransactional globalTransactionalAnnotation =

getAnnotation(method, targetClass, GlobalTransactional.class);

//GlobalLock会将本地事务的执行纳入Seata分布式事务的管理,共同竞争全局锁

//保证全局事务在执行的时候,本地事务不可以操作全局事务的记录

final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);//获取全局锁

boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes);

if (!localDisable) {

if (globalTransactionalAnnotation != null || this.aspectTransactional != null) {

AspectTransactional transactional;

if (globalTransactionalAnnotation != null) {

transactional = new AspectTransactional(globalTransactionalAnnotation.timeoutMills(),

globalTransactionalAnnotation.name(), globalTransactionalAnnotation.rollbackFor(),

globalTransactionalAnnotation.noRollbackForClassName(),

globalTransactionalAnnotation.noRollbackFor(),

globalTransactionalAnnotation.noRollbackForClassName(),

globalTransactionalAnnotation.propagation(),

globalTransactionalAnnotation.lockRetryInterval(),

globalTransactionalAnnotation.lockRetryTimes());

} else {

transactional = this.aspectTransactional;

}

//执行全局事务

return handleGlobalTransaction(methodInvocation, transactional);

} else if (globalLockAnnotation != null) {

//执行全局锁

return handleGlobalLock(methodInvocation, globalLockAnnotation);

}

}

}

return methodInvocation.proceed();

}

}

具体流程图如下所示:

核心源码

在上面我们讲解到 GlobalTransactionalInterceptor 作为全局事务拦截器,一旦执行拦截,就会进入invoke方法,其中,我们会做 @GlobalTransactional 注解的判断,如果有这个注解的存在,会执行全局事务和全局锁,再执行全局事务的时候会调用 handleGlobalTransaction 全局事务处理器,获取事务信息,那我们接下来就来看一下 GlobalTransactionalInterceptor.handleGlobalTransaction 到底是如何执行全局事务的。

Object handleGlobalTransaction(final MethodInvocation methodInvocation,

final AspectTransactional aspectTransactional) throws Throwable {

boolean succeed = true;

try {

return transactionalTemplate.execute(new TransactionalExecutor() {

@Override

public Object execute() throws Throwable {

return methodInvocation.proceed();

}

//获取事务名称,默认获取方法名

public String name() {

String name = aspectTransactional.getName();

if (!StringUtils.isNullOrEmpty(name)) {

return name;

}

return formatMethod(methodInvocation.getMethod());

}

/

**

* 解析GlobalTransation注解属性,封装对对象

* @return

*/

@Override

public TransactionInfo getTransactionInfo() {

// reset the value of timeout

//获取超时时间,默认60秒

int timeout = aspectTransactional.getTimeoutMills();

if (timeout <= 0 || timeout == DEFAULT_GLOBAL_TRANSACTION_TIMEOUT) {

timeout = defaultGlobalTransactionTimeout;

}

//构建事务信息对象

TransactionInfo transactionInfo = new TransactionInfo();

transactionInfo.setTimeOut(timeout);//超时时间

transactionInfo.setName(name());//事务名称

transactionInfo.setPropagation(aspectTransactional.getPropagation());//事务传播

transactionInfo.setLockRetryInterval(aspectTransactional.getLockRetryInterval());//校验或占用全局锁重试间隔

transactionInfo.setLockRetryTimes(aspectTransactional.getLockRetryTimes());//校验或占用全局锁重试次数

SetrollbackRules = new LinkedHashSet<>();

//其他构建信息

for (Class rbRule : aspectTransactional.getRollbackFor()) {

rollbackRules.add(new RollbackRule(rbRule));

}

for (String rbRule : aspectTransactional.getRollbackForClassName()) {

rollbackRules.add(new RollbackRule(rbRule));

}

for (Class rbRule : aspectTransactional.getNoRollbackFor()) {

rollbackRules.add(new NoRollbackRule(rbRule));

}

for (String rbRule : aspectTransactional.getNoRollbackForClassName()) {

rollbackRules.add(new NoRollbackRule(rbRule));

}

transactionInfo.setRollbackRules(rollbackRules);

return transactionInfo;

}

});

} catch (TransactionalExecutor.ExecutionException e) {

//执行异常

TransactionalExecutor.Code code = e.getCode();

switch (code) {

case RollbackDone:

throw e.getOriginalException();

case BeginFailure:

succeed = false;

failureHandler.onBeginFailure(e.getTransaction(), e.getCause());

throw e.getCause();

case CommitFailure:

succeed = false;

failureHandler.onCommitFailure(e.getTransaction(), e.getCause());

throw e.getCause();

case RollbackFailure:

failureHandler.onRollbackFailure(e.getTransaction(), e.getOriginalException());

throw e.getOriginalException();

case RollbackRetrying:

failureHandler.onRollbackRetrying(e.getTransaction(), e.getOriginalException());

throw e.getOriginalException();

default:

throw new ShouldNeverHappenException(String.format("Unknown TransactionalExecutor.Code: %s", code));

}

} finally {

if (degradeCheck) {

EVENT_BUS.post(new DegradeCheckEvent(succeed));

}

}

}

在这里我们,主要关注一个重点方法 execute() ,这个方法主要用来执行事务的具体流程:

获取事务信息。执行全局事务。发生异常全局回滚,各个数据通过UndoLog进行事务补偿。全局事务提交。清除所有资源。

这个位置也是一个非常核心的一个位置,因为我们所有的业务进来以后都会去走这个位置,具体源码如下所示:

public Object execute(TransactionalExecutor business) throws Throwable {

// 1. Get transactionInfo

//获取事务信息

TransactionInfo txInfo = business.getTransactionInfo();

if (txInfo == null) {

throw new ShouldNeverHappenException("transactionInfo does not exist");

}

// 1.1 Get current transaction, if not null, the tx role is GlobalTransactionRole.Participant.

//获取当前事务,主要获取XID

GlobalTransaction tx = GlobalTransactionContext.getCurrent();

// 1.2 Handle the transaction propagation.

//根据配置的不同事务传播行为,执行不同的逻辑

Propagation propagation = txInfo.getPropagation();

SuspendedResourcesHolder suspendedResourcesHolder = null;

try {

switch (propagation) {

case NOT_SUPPORTED:

// If transaction is existing, suspend it.

if (existingTransaction(tx)) {

suspendedResourcesHolder = tx.suspend();

}

// Execute without transaction and return.

return business.execute();

case REQUIRES_NEW:

// If transaction is existing, suspend it, and then begin new transaction.

if (existingTransaction(tx)) {

suspendedResourcesHolder = tx.suspend();

tx = GlobalTransactionContext.createNew();

}

// Continue and execute with new transaction

break;

case SUPPORTS:

// If transaction is not existing, execute without transaction.

if (notExistingTransaction(tx)) {

return business.execute();

}

// Continue and execute with new transaction

break;

case REQUIRED:

// If current transaction is existing, execute with current transaction,

// else continue and execute with new transaction.

break;

case NEVER:

// If transaction is existing, throw exception.

if (existingTransaction(tx)) {

throw new TransactionException(

String.format("Existing transaction found for transaction marked with propagation never, xid = %s"

, tx.getXid()));

} else {

// Execute without transaction and return.

return business.execute();

}

case MANDATORY:

// If transaction is not existing, throw exception.

if (notExistingTransaction(tx)) {

throw new TransactionException("No existing transaction found for transaction marked with propagation mandatory");

}

// Continue and execute with current transaction.

break;

default:

throw new TransactionException("Not Supported Propagation:" + propagation);

}

// 1.3 If null, create new transaction with role GlobalTransactionRole.Launcher.

//如果当前事务为空,创建一个新的事务

if (tx == null) {

tx = GlobalTransactionContext.createNew();

}

// set current tx config to holder

GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);

try {

// 2. If the tx role is GlobalTransactionRole.Launcher, send the request of beginTransaction to TC,

// else do nothing. Of course, the hooks will still be triggered.

//开始执行全局事务

beginTransaction(txInfo, tx);

Object rs;

try {

// Do Your Business

// 执行当前业务逻辑

//1、在TC注册当前分支事务,TC会在branch_table中插入一条分支事务数据

//2、执行本地update语句,并在执行前后查询数据状态,并把数据前后镜像存入到undo_log中

//3、远程调用其他应用,远程应用接收到XID,也会注册分支事务,写入branch_table以及本地undo_log表

//4、会在lock_table表中插入全局锁数据(一个分支一条)

rs = business.execute();

} catch (Throwable ex) {

// 3. The needed business exception to rollback.

//发生异常全局回滚,每个事务通过undo_log表进行事务补偿

completeTransactionAfterThrowing(txInfo, tx, ex);

throw ex;

}

// 4. everything is fine, commit.

//全局提交

commitTransaction(tx);

return rs;

} finally {

//5. clear

//清理所有资源

resumeGlobalLockConfig(previousConfig);

triggerAfterCompletion();

cleanUp();

}

} finally {

// If the transaction is suspended, resume it.

if (suspendedResourcesHolder != null) {

tx.resume(suspendedResourcesHolder);

}

}

}

这其中的第三步和第四步其实在向 TC(Seata-Server)发起全局事务的提交或者回滚,在这里我们首先关注执行全局事务的 ​​beginTransaction()​​ 方法。

// 向TC发起请求,采用模板模式

private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {

try {

triggerBeforeBegin();

//对TC发起请求

tx.begin(txInfo.getTimeOut(), txInfo.getName());

triggerAfterBegin();

} catch (TransactionException txe) {

throw new TransactionalExecutor.ExecutionException(tx, txe,

TransactionalExecutor.Code.BeginFailure);

}

}

在来关注其中,向TC发起请求的 tx.begin() 方法,而调用begin()方法的类为:DefaultGlobalTransaction。

@Override

public void begin(int timeout, String name) throws TransactionException {

//判断调用者是否为TM

if (role != GlobalTransactionRole.Launcher) {

assertXIDNotNull();

if (LOGGER.isDebugEnabled()) {

LOGGER.debug("Ignore Begin(): just involved in global transaction [{ }]", xid);

}

return;

}

assertXIDNull();

String currentXid = RootContext.getXID();

if (currentXid != null) {

throw new IllegalStateException("Global transaction already exists," +

" cant begin a new global transaction, currentXid = " + currentXid);

}

//获取XID

xid = transactionManager.begin(null, null, name, timeout);

status = GlobalStatus.Begin;

//绑定XID

RootContext.bind(xid);

if (LOGGER.isInfoEnabled()) {

LOGGER.info("Begin new global transaction [{ }]", xid);

}

}

再来看一下 transactionManager.begin() 方法,这个时候使用的是 DefaultTransactionManager.begin 默认的事务管理者,来获取XID,传入事务相关的信息 ,最好TC返回对应的全局事务XID,它调用的是DefaultTransactionManager.begin()方法。

public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)

throws TransactionException {

GlobalBeginRequest request = new GlobalBeginRequest();

request.setTransactionName(name);

request.setTimeout(timeout);

//发送请求得到响应

GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request);

if (response.getResultCode() == ResultCode.Failed) {

throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());

}

//返回XID

return response.getXid();

}

在这里我们需要关注一个syncCall,在这里采用的是Netty通讯方式。

private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException {

try {

// 通过Netty发送请求

return (AbstractTransactionResponse) TmNettyRemotingClient.getInstance().sendSyncRequest(request);

} catch (TimeoutException toe) {

throw new TmTransactionException(TransactionExceptionCode.IO, "RPC timeout", toe);

}

}

具体图解如下:

在这里我们需要重点了解 GlobalTransactionScanner 这个类型,在这个类型中继承了一些接口和抽象类,这个类主要作用就是扫描有注解的Bean,并做AOP增强。

ApplicationContextAware​:继承这个类型以后,需要实现其方法setApplicationContext()​,当Spring启动完成以后,会自动调用这个类型,将ApplicationContext​ 给bean​,也就是说,GlobalTransactionScanner 能够很自然的使用Spring环境。InitializingBean​: 继承这个接口,需要实现afterPropertiesSet()​ ,但凡是继承这个接口的类,在初始化的时候,当所有的properties 设置完成以后,会执行这个方法。DisposableBean​ : 这个类,实现了一个destroy() 这个方法是在销毁的时候去调用。AbstractAutoProxyCreator​: 这个类是Spring实现AOP的一种方式,本质上是一个BeanPostProcessor​ ,在Bean初始化至去年,调用内部createProxy() ,创建一个Bean的AOP代理Bean并返回,对Bean进行增强。

Seata数据源代理

在上面的环节中,我们讲解了Seata AT模式2PC的执行流程,那么现在我们就来带大家了解一下关于AT数据源代理的信息,这也是AT模式中非常关键的一个重要知识点,大家可以拿起小本子,记下来。

首先AT模式的核心主要分为一下两个:

开启全局事务,获取全局锁。解析SQL并写入undoLog中。

关于第一点我们已经分析清楚了,第二点就是关于AT模式如何解析SQL并写入undoLog中,但是在这之前,我们需要知道Seata是如何选择数据源,并进行数据源代理的。虽然全局事务拦截成功后最终还是执行了业务方法进行SQL提交和操作,但是由于Seata对数据源进行了代理,所以SQL的解析和undoLog的操作,是在数据源代理中进行完成的。

数据源代理是Seata中一个非常重要的知识点,在分布式事务运行过程中,undoLog的记录、资源的锁定,用户都是无感知的,因为这些操作都是数据源的代理中完成了,恰恰是这样,我们才要去了解,这样不仅有利于我们了解Seata的核心操作,还能对以后源码阅读有所帮助,因为其实很多底层代码都会去使用这样用户无感知的方式(代理)去实现。

同样,我们在之前的寻找源码入口的时候,通过我们项目中引入的jar找到一个 SeataAutoConfiguration 类,我们在里面找到一个SeataDataSourceBeanPostProcessor(),这个就是我们数据源代理的入口方法。

我们进入SeataDataSourceBeanPostProcessor类里面,发现继承了一个 BeanPostProcessor ,这个接口我们应该很熟悉,这个是Sprng的拓展接口,所有的Bean对象,都有进入两个方法 postProcessAfterInitialization() 和 postProcessBeforeInitialization() 这两个方法都是由 BeanPostProcessor提供的,这两个方法,一个是初始化之前执行Before。一个是在初始化之后执行After,主要用来对比我们的的Bean是否为数据源代理对象。

在这里我们需要关注到一个postProcessAfterInitialization.proxyDataSource() 方法,这个里面。

private Object proxyDataSource(Object originBean) {

DataSourceProxy dataSourceProxy = DataSourceProxyHolder.get().putDataSource((DataSource) originBean);

if (this.useJdkProxy) {

return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), SpringProxyUtils.getAllInterfaces(originBean), (proxy, method, args) -> handleMethodProxy(dataSourceProxy, method, args, originBean));

} else {

return Enhancer.create(originBean.getClass(), (MethodInterceptor) (proxy, method, args, methodProxy) -> handleMethodProxy(dataSourceProxy, method, args, originBean));

}

}

这里有一个DataSourceProxy代理对象,我们需要看的就是这个类,这个就是我们数据库代理的对象,我们从我们下载的源码项目中,搜索这个代理对象,当我们打开这个类的目录时发现,除了这个,还有ConnectionProxy 连接对象、StatementProxy、PreparedStatementProxy SQL执行对象,这些都被Seata进行了代理,为什么要对这些都进行代理,代理的目的其实为了执行Seata的业务逻辑,生成undoLog,全局事务的开启,事务的提交回滚等操作。

DataSourceProxy 具体做了什么,主要功能有哪些,我们来看一下。他在源码中是如何体现的,我们需要关注的是init()。

public class DataSourceProxy extends AbstractDataSourceProxy implements Resource {

private String resourceGroupId;

private void init(DataSource dataSource, String resourceGroupId) {

//资源组ID,默认是“default”这个默认值

this.resourceGroupId = resourceGroupId;

try (Connection connection = dataSource.getConnection()) {

//根据原始数据源得到JDBC连接和数据库类型

jdbcUrl = connection.getMetaData().getURL();

dbType = JdbcUtils.getDbType(jdbcUrl);

if (JdbcConstants.ORACLE.equals(dbType)) {

userName = connection.getMetaData().getUserName();

} else if (JdbcConstants.MARIADB.equals(dbType)) {

dbType = JdbcConstants.MYSQL;

}

} catch (SQLException e) {

throw new IllegalStateException("can not init dataSource", e);

}

initResourceId();

DefaultResourceManager.get().registerResource(this);

if (ENABLE_TABLE_META_CHECKER_ENABLE) {

//如果配置开关打开,会定时在线程池不断更新表的元数据缓存信息

tableMetaExecutor.scheduleAtFixedRate(() -> {

try (Connection connection = dataSource.getConnection()) {

TableMetaCacheFactory.getTableMetaCache(DataSourceProxy.this.getDbType())

.refresh(connection, DataSourceProxy.this.getResourceId());

} catch (Exception ignore) {

}

}, 0, TABLE_META_CHECKER_INTERVAL, TimeUnit.MILLISECONDS);

}

//Set the default branch type to AT in the RootContext.

RootContext.setDefaultBranchType(this.getBranchType());

}

}

从上面我们可以看出,他主要做了以下几点的增强:

给每个数据源标识资源组ID。如果打开配置,会有一个定时线程池定时更新表的元数据信息并缓存到本地。生成代理连接ConnectionProxy 对象。

在这三个增强功能里面,第三个是最重要的,在AT模式里面,会自动记录undoLog,资源锁定,都是通过ConnectionProxy完成的,除此之外 DataSrouceProxy重写了一个方法 getConnection,因为这里返回的是一个 ConnectionProxy,而不是原生的Connection。

@Override

public ConnectionProxy getConnection() throws SQLException {

Connection targetConnection = targetDataSource.getConnection();

return new ConnectionProxy(this, targetConnection);

}

@Override

public ConnectionProxy getConnection(String username, String password) throws SQLException {

Connection targetConnection = targetDataSource.getConnection(username, password);

return new ConnectionProxy(this, targetConnection);

}ConnectionProxy

ConnectionProxy 继承 AbstractConnectionProxy ,在这个父类中有很多公用的方法,在这个父类有 PreparedStatementProxy 、StatementProxy 、DataSourceProxy。

所以我们需要先来看一下AbstractConnectionProxy,因为这里封装了需要我们用到的通用方法和逻辑,在其中我们需要关注的主要在于 PreparedStatementProxy 和 StatementProxy ,在这里的逻辑主要是数据源连接的步骤,连接获取,创建执行对象等等。

@Override

public Statement createStatement() throws SQLException {

//调用真实连接对象获取Statement对象

Statement targetStatement = getTargetConnection().createStatement();

//创建Statement的代理

return new StatementProxy(this, targetStatement);

}

@Override

public PreparedStatement prepareStatement(String sql) throws SQLException {

//获取数据库类型 mysql/oracle

String dbType = getDbType();

// support oracle 10.2+

PreparedStatement targetPreparedStatement = null;

//如果是AT模式且开启全局事务

if (BranchType.AT == RootContext.getBranchType()) {

ListsqlRecognizers = SQLVisitorFactory.get(sql, dbType);

if (sqlRecognizers != null && sqlRecognizers.size() == 1) {

SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);

if (sqlRecognizer != null && sqlRecognizer.getSQLType() == SQLType.INSERT) {

//获取表的元数据

TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dbType).getTableMeta(getTargetConnection(),

sqlRecognizer.getTableName(), getDataSourceProxy().getResourceId());

//得到表的主键列名

String[] pkNameArray = new String[tableMeta.getPrimaryKeyOnlyName().size()];

tableMeta.getPrimaryKeyOnlyName().toArray(pkNameArray);

targetPreparedStatement = getTargetConnection().prepareStatement(sql,pkNameArray);

}

}

}

if (targetPreparedStatement == null) {

targetPreparedStatement = getTargetConnection().prepareStatement(sql);

}

//创建PreparedStatementProxy代理

return new PreparedStatementProxy(this, targetPreparedStatement, sql);

}

在这两个代理对象中,都用到了以下几个方法:

@Override

public ResultSet executeQuery(String sql) throws SQLException {

this.targetSQL = sql;

return ExecuteTemplate.execute(this, (statement, args) -> statement.executeQuery((String) args[0]), sql);

}

@Override

public int executeUpdate(String sql) throws SQLException {

this.targetSQL = sql;

return ExecuteTemplate.execute(this, (statement, args) -> statement.executeUpdate((String) args[0]), sql);

}

@Override

public boolean execute(String sql) throws SQLException {

this.targetSQL = sql;

return ExecuteTemplate.execute(this, (statement, args) -> statement.execute((String) args[0]), sql);

}

在这些方法中都调用了 ExecuteTemplate.execute(),所以我们就看一下在 ExecuteTemplate类中具体是做了什么操作:

public class ExecuteTemplate {

public static T execute(ListsqlRecognizers,

StatementProxystatementProxy,

StatementCallbackstatementCallback,

Object... args) throws SQLException {

//如果没有全局锁,并且不是AT模式,直接执行SQL

if (!RootContext.requireGlobalLock() && BranchType.AT != RootContext.getBranchType()) {

// Just work as original statement

return statementCallback.execute(statementProxy.getTargetStatement(), args);

}

//得到数据库类型- mysql/oracle

String dbType = statementProxy.getConnectionProxy().getDbType();

if (CollectionUtils.isEmpty(sqlRecognizers)) {

//sqlRecognizers 为SQL语句的解析器,获取执行的SQL,通过它可以获得SQL语句表名、相关的列名、类型等信息,最后解析出对应的SQL表达式

sqlRecognizers = SQLVisitorFactory.get(

statementProxy.getTargetSQL(),

dbType);

}

Executorexecutor;

if (CollectionUtils.isEmpty(sqlRecognizers)) {

//如果seata没有找到合适的SQL语句解析器,那么便创建简单执行器PlainExecutor

//PlainExecutor直接使用原生的Statment对象执行SQL

executor = new PlainExecutor<>(statementProxy, statementCallback);

} else {

if (sqlRecognizers.size() == 1) {

SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);

switch (sqlRecognizer.getSQLType()) {

//新增

case INSERT:

executor = EnhancedServiceLoader.load(InsertExecutor.class, dbType,

new Class[]{ StatementProxy.class, StatementCallback.class, SQLRecognizer.class},

new Object[]{ statementProxy, statementCallback, sqlRecognizer});

break;

//修改

case UPDATE:

executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);

break;

//删除

case DELETE:

executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer);

break;

//加锁

case SELECT_FOR_UPDATE:

executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);

break;

//插入加锁

case INSERT_ON_DUPLICATE_UPDATE:

switch (dbType) {

case JdbcConstants.MYSQL:

case JdbcConstants.MARIADB:

executor =

new MySQLInsertOrUpdateExecutor(statementProxy, statementCallback, sqlRecognizer);

break;

default:

throw new NotSupportYetException(dbType + " not support to INSERT_ON_DUPLICATE_UPDATE");

}

break;

//原生

default:

executor = new PlainExecutor<>(statementProxy, statementCallback);

break;

}

} else {

//批量处理SQL语句

executor = new MultiExecutor<>(statementProxy, statementCallback, sqlRecognizers);

}

}

T rs;

try {

//执行

rs = executor.execute(args);

} catch (Throwable ex) {

if (!(ex instanceof SQLException)) {

// Turn other exception into SQLException

ex = new SQLException(ex);

}

throw (SQLException) ex;

}

return rs;

}

}

在 ExecuteTemplate就一个 execute(),Seata将SQL执行委托给不同的执行器(模板),Seata提供了6种执行器也就是我们代码 case 中(INSERT,UPDATE,DELETE,SELECT_FOR_UPDATE,INSERT_ON_DUPLICATE_UPDATE),这些执行器的父类都是AbstractDMLBaseExecutor。

UpdateExecutor: 执行update语句。InsertExecutor: 执行insert语句。DeleteExecutor: 执行delete语句。SelectForUpdateExecutor: 执行select for update语句。PlainExecutor: 执行普通查询语句。MultiExecutor: 复合执行器,在一条SQL语句中执行多条语句。

关系图如下:

然后我们找到 rs = executor.execute(args); 最终执行的方法,找到最顶级的父类BaseTransactionalExecutor.execute()。

@Override

public T execute(Object... args) throws Throwable {

String xid = RootContext.getXID();

if (xid != null) {

//获取XID

statementProxy.getConnectionProxy().bind(xid);

}

//设置全局锁

statementProxy.getConnectionProxy().setGlobalLockRequire(RootContext.requireGlobalLock());

return doExecute(args);

}

在根据doExecute(args);找到其中的重写方法 AbstractDMLBaseExecutor.doExecute()。

@Override

public T doExecute(Object... args) throws Throwable {

AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();

//是否自动提交

if (connectionProxy.getAutoCommit()) {

return executeAutoCommitTrue(args);

} else {

return executeAutoCommitFalse(args);

}

}

对于数据库而言,本身都是自动提交的,所以我们进入executeAutoCommitTrue()。

protected T executeAutoCommitTrue(Object[] args) throws Throwable {

ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();

try {

//设置为手动提交

connectionProxy.changeAutoCommit();

return new LockRetryPolicy(connectionProxy).execute(() -> {

//调用手动提交方法,得到分支执行的最终结果

T result = executeAutoCommitFalse(args);

//执行提交

connectionProxy.commit();

return result;

});

} catch (Exception e) {

// when exception occur in finally,this exception will lost, so just print it here

LOGGER.error("execute executeAutoCommitTrue error:{ }", e.getMessage(), e);

if (!LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) {

connectionProxy.getTargetConnection().rollback();

}

throw e;

} finally {

connectionProxy.getContext().reset();

connectionProxy.setAutoCommit(true);

}

}

connectionProxy.changeAutoCommit()方法,修改为手动提交后,我们看来最关键的代码executeAutoCommitFalse()。

protected T executeAutoCommitFalse(Object[] args) throws Exception {

if (!JdbcConstants.MYSQL.equalsIgnoreCase(getDbType()) && isMultiPk()) {

throw new NotSupportYetException("multi pk only support mysql!");

}

//获取前镜像

TableRecords beforeImage = beforeImage();

//执行具体业务

T result = statementCallback.execute(statementProxy.getTargetStatement(), args);

//获取执行数量

int updateCount = statementProxy.getUpdateCount();

//判断如果执行数量大于0

if (updateCount > 0) {

//获取后镜像

TableRecords afterImage = afterImage(beforeImage);

//暂存到undolog中,在Commit的时候保存到数据库

prepareUndoLog(beforeImage, afterImage);

}

return result;

}

我们再回到executeAutoCommitTrue中,去看看提交做了哪些操作connectionProxy.commit()。

@Override

public void commit() throws SQLException {

try {

lockRetryPolicy.execute(() -> {

//具体执行

doCommit();

return null;

});

} catch (SQLException e) {

if (targetConnection != null && !getAutoCommit() && !getContext().isAutoCommitChanged()) {

rollback();

}

throw e;

} catch (Exception e) {

throw new SQLException(e);

}

}

进入到doCommit()中。

private void doCommit() throws SQLException {

//判断是否存在全局事务

if (context.inGlobalTransaction()) {

processGlobalTransactionCommit();

} else if (context.isGlobalLockRequire()) {

processLocalCommitWithGlobalLocks();

} else {

targetConnection.commit();

}

}

作为分布式事务,一定是存在全局事务的,所以我们进入 processGlobalTransactionCommit()。

private void processGlobalTransactionCommit() throws SQLException {

try {

//注册分支事务

register();

} catch (TransactionException e) {

recognizeLockKeyConflictException(e, context.buildLockKeys());

}

try {

//写入数据库undolog

UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);

//执行原生提交 一阶段提交

targetConnection.commit();

} catch (Throwable ex) {

LOGGER.error("process connectionProxy commit error: { }", ex.getMessage(), ex);

report(false);

throw new SQLException(ex);

}

if (IS_REPORT_SUCCESS_ENABLE) {

report(true);

}

context.reset();

}

其中register()方法就是注册分支事务的方法,同时还会将undoLog写入数据库和执行提交等操作。

//注册分支事务,生成分支事务ID

private void register() throws TransactionException {

if (!context.hasUndoLog() || !context.hasLockKey()) {

return;

}

//注册分支事务

Long branchId = DefaultResourceManager.get().branchRegister(BranchType.AT, getDataSourceProxy().getResourceId(),

null, context.getXid(), context.getApplicationData(), context.buildLockKeys());

context.setBranchId(branchId);

}

然后我们在回到processGlobalTransactionCommit中,看看写入数据库中的flushUndoLogs()。

@Override

public void flushUndoLogs(ConnectionProxy cp) throws SQLException {

ConnectionContext connectionContext = cp.getContext();

if (!connectionContext.hasUndoLog()) {

return;

}

//获取XID

String xid = connectionContext.getXid();

//获取分支ID

long branchId = connectionContext.getBranchId();

BranchUndoLog branchUndoLog = new BranchUndoLog();

branchUndoLog.setXid(xid);

branchUndoLog.setBranchId(branchId);

branchUndoLog.setSqlUndoLogs(connectionContext.getUndoItems());

UndoLogParser parser = UndoLogParserFactory.getInstance();

byte[] undoLogContent = parser.encode(branchUndoLog);

if (LOGGER.isDebugEnabled()) {

LOGGER.debug("Flushing UNDO LOG: { }", new String(undoLogContent, Constants.DEFAULT_CHARSET));

}

CompressorType compressorType = CompressorType.NONE;

if (needCompress(undoLogContent)) {

compressorType = ROLLBACK_INFO_COMPRESS_TYPE;

undoLogContent = CompressorFactory.getCompressor(compressorType.getCode()).compress(undoLogContent);

}

//写入数据库具体位置

insertUndoLogWithNormal(xid, branchId, buildContext(parser.getName(), compressorType), undoLogContent, cp.getTargetConnection());

}

具体写入方法,此时我们使用的是MySql,所以执行的是MySql实现类MySQLUndoLogManager.insertUndoLogWithNormal()。

@Override

protected void insertUndoLogWithNormal(String xid, long branchId, String rollbackCtx, byte[] undoLogContent,

Connection conn) throws SQLException {

insertUndoLog(xid, branchId, rollbackCtx, undoLogContent, State.Normal, conn);

}

//具体写入操作

private void insertUndoLog(String xid, long branchId, String rollbackCtx, byte[] undoLogContent,

State state, Connection conn) throws SQLException {

try (PreparedStatement pst = conn.prepareStatement(INSERT_UNDO_LOG_SQL)) {

pst.setLong(1, branchId);

pst.setString(2, xid);

pst.setString(3, rollbackCtx);

pst.setBytes(4, undoLogContent);

pst.setInt(5, state.getValue());

pst.executeUpdate();

} catch (Exception e) {

if (!(e instanceof SQLException)) {

e = new SQLException(e);

}

throw (SQLException) e;

}

}

具体流程如下所示:

Seata 服务端

我们找到Server.java 这里就是启动入口,在这个入口中找到协调者,因为TC整体的操作就是协调整体的全局事务。

//默认协调者

DefaultCoordinator coordinator = DefaultCoordinator.getInstance(nettyRemotingServer);

在DefaultCoordinator类中我们找到 一个doGlobalBegin 这个就是处理全局事务开始的方法,以及全局提交 doGlobalCommit 和全局回滚 doGlobalRollback。

//处理全局事务

@Override

protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext)

throws TransactionException {

//响应客户端xid

response.setXid(core.begin(rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(),

request.getTransactionName(), request.getTimeout()));

if (LOGGER.isInfoEnabled()) {

LOGGER.info("Begin new global transaction applicationId: { },transactionServiceGroup: { }, transactionName: { },timeout:{ },xid:{ }",

rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(), request.getTransactionName(), request.getTimeout(), response.getXid());

}

}

//处理全局提交

@Override

protected void doGlobalCommit(GlobalCommitRequest request, GlobalCommitResponse response, RpcContext rpcContext)

throws TransactionException {

MDC.put(RootContext.MDC_KEY_XID, request.getXid());

response.setGlobalStatus(core.commit(request.getXid()));

}

//处理全局回滚

@Override

protected void doGlobalRollback(GlobalRollbackRequest request, GlobalRollbackResponse response,

RpcContext rpcContext) throws TransactionException {

MDC.put(RootContext.MDC_KEY_XID, request.getXid());

response.setGlobalStatus(core.rollback(request.getXid()));

}

在这里我们首先关注 doGlobalBegin 中 core.begin()。

@Override

public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)

throws TransactionException {

//创建全局事务Session

GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name,

timeout);

MDC.put(RootContext.MDC_KEY_XID, session.getXid());

//为Session重添加回调监听,SessionHolder.getRootSessionManager() 获取一个全局Session管理器DataBaseSessionManager

//观察者设计模式,创建DataBaseSessionManager

session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());

//全局事务开始

session.begin();

// transaction start event

MetricsPublisher.postSessionDoingEvent(session, false);

return session.getXid();

}

然后我们在来看一下SessionHolder.getRootSessionManager()。

/

**

* Gets root session manager.

* 获取一个全局Session管理器

* @return the root session manager

*/

public static SessionManager getRootSessionManager() {

if (ROOT_SESSION_MANAGER == null) {

throw new ShouldNeverHappenException("SessionManager is NOT init!");

}

return ROOT_SESSION_MANAGER;

}

public static void init(String mode) {

if (StringUtils.isBlank(mode)) {

mode = CONFIG.getConfig(ConfigurationKeys.STORE_SESSION_MODE,

CONFIG.getConfig(ConfigurationKeys.STORE_MODE, SERVER_DEFAULT_STORE_MODE));

}

StoreMode storeMode = StoreMode.get(mode);

//判断Seata模式,当前为DB

if (StoreMode.DB.equals(storeMode)) {

//通过SPI机制读取SessionManager接口实现类,读取的META-INF.services目录,在通过反射机制创建对象DataBaseSessionManager

ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName());

........

}

}

在这里他其实读取的是DB模式下 io.seata.server.session.SessionManager文件的内容。

我们在回到begin方法中,去查看session.begin()。

@Override

public void begin() throws TransactionException {

//声明全局事务开始

this.status = GlobalStatus.Begin;

//开始时间

this.beginTime = System.currentTimeMillis();

//激活全局事务

this.active = true;

//将SessionManager放入到集合中,调用onBegin方法

for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {

//调用父级抽象类的方法

lifecycleListener.onBegin(this);

}

}

这里我们来看一下 onBegin() 方法,调用的是父级的方法,在这其中我们要关注 addGlobalSession() 方法,但是要注意,这里我们用的是db模式所以调用的是db模式的 DateBaseSessionManager。

@Override

public void onBegin(GlobalSession globalSession) throws TransactionException {

//这里调用的是DateBaseSessionManager

addGlobalSession(globalSession);

}

@Override

public void addGlobalSession(GlobalSession session) throws TransactionException {

if (StringUtils.isBlank(taskName)) {

//写入session

boolean ret = transactionStoreManager.writeSession(LogOperation.GLOBAL_ADD, session);

if (!ret) {

throw new StoreException("addGlobalSession failed.");

}

} else {

boolean ret = transactionStoreManager.writeSession(LogOperation.GLOBAL_UPDATE, session);

if (!ret) {

throw new StoreException("addGlobalSession failed.");

}

}

}

然后在看查询其中关键的方法DataBaseTransactionStoreManager.writeSession()。

@Override

public boolean writeSession(LogOperation logOperation, SessionStorable session) {

//第一次进入是写入 会进入当前方法

//全局添加

if (LogOperation.GLOBAL_ADD.equals(logOperation)) {

return logStore.insertGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));

//全局修改

} else if (LogOperation.GLOBAL_UPDATE.equals(logOperation)) {

return logStore.updateGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));

//全局删除

} else if (LogOperation.GLOBAL_REMOVE.equals(logOperation)) {

return logStore.deleteGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));

//分支添加

} else if (LogOperation.BRANCH_ADD.equals(logOperation)) {

return logStore.insertBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));

//分支更新

} else if (LogOperation.BRANCH_UPDATE.equals(logOperation)) {

return logStore.updateBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));

//分支移除

} else if (LogOperation.BRANCH_REMOVE.equals(logOperation)) {

return logStore.deleteBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));

} else {

throw new StoreException("Unknown LogOperation:" + logOperation.name());

}

}

我们就看第一次进去的方法logStore.insertGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session))。

@Override

public boolean insertGlobalTransactionDO(GlobalTransactionDO globalTransactionDO) {

String sql = LogStoreSqlsFactory.getLogStoreSqls(dbType).getInsertGlobalTransactionSQL(globalTable);

Connection conn = null;

PreparedStatement ps = null;

try {

int index = 1;

conn = logStoreDataSource.getConnection();

conn.setAutoCommit(true);

ps = conn.prepareStatement(sql);

ps.setString(index++, globalTransactionDO.getXid());

ps.setLong(index++, globalTransactionDO.getTransactionId());

ps.setInt(index++, globalTransactionDO.getStatus());

ps.setString(index++, globalTransactionDO.getApplicationId());

ps.setString(index++, globalTransactionDO.getTransactionServiceGroup());

String transactionName = globalTransactionDO.getTransactionName();

transactionName = transactionName.length() > transactionNameColumnSize ?

transactionName.substring(0, transactionNameColumnSize) :

transactionName;

ps.setString(index++, transactionName);

ps.setInt(index++, globalTransactionDO.getTimeout());

ps.setLong(index++, globalTransactionDO.getBeginTime());

ps.setString(index++, globalTransactionDO.getApplicationData());

return ps.executeUpdate() > 0;

} catch (SQLException e) {

throw new StoreException(e);

} finally {

IOUtil.close(ps, conn);

}

}

在这里有一个 GlobalTransactionDO 对象,里面有xid、transactionId 等等,到这里是不是就很熟悉了。

还记得我们第一次使用Seata的时候会创建三张表:

branch_table 分支事务表。global_table 全局事务表。lock_table 全局锁表。

而这里就是对应我们的global_table表,其他两个也是差不多,都是一样的操作。

流程图如下:

总结

完整流程图:

对于Seata源码来说主要是了解从哪里入口以及核心点在哪里,遇到有疑问的,可以Debug,对于Seata AT模式,我们主要掌握的核心点是

如何获取全局锁、开启全局事务。解析SQL并写入undolog。

围绕这两点去看的话,会有针对性一点,到这里我们的Seata源码就讲解完了。

很赞哦!(86)