草庐IT

Seata 1.5.2 源码学习

狂乱的贵公子 2023-04-19 原文

文章有点长,我决定用半个小时来和你分享~? 废话不多说,上代码。。。

基于Seata 1.5.2,项目中用 seata-spring-boot-starter

1. SeataDataSourceAutoConfiguration

SeataDataSourceAutoConfiguration 主要是配置数据源代理,可以看到:

  1. 默认seata.enabled、seata.enableAutoDataSourceProxy、seata.enable-auto-data-source-proxy都是true
  2. 只有当classpath中有DataSource时才会进行此配置
  3. 创建了一个SeataAutoDataSourceProxyCreator,用于自动代理数据源

先记一下,SeataAutoDataSourceProxyCreator是一个BeanPostProcessor

刚才new了一个SeataAutoDataSourceProxyCreator,继续看构造方法,默认useJdkProxy是false,excludes为空,dataSourceProxyMode是AT

构造方法中最重要的一件事情是构造AOP通知(拦截器),这里new了一个SeataAutoDataSourceProxyAdvice

SeataAutoDataSourceProxyAdvice是一个MethodInterceptor。

MethodInterceptor是aop中的一个接口,当目标方法被调用时就会调用与之关联的MethodInterceptor的invoke方法

至此,在构造方法中完成了advisors的赋值,advisors[]中有一个DefaultIntroductionAdvisor,DefaultIntroductionAdvisor中引用了SeataAutoDataSourceProxyAdvice

前面说过,SeataAutoDataSourceProxyCreator是一个BeanPostProcessor,而BeanPostProcessor是BeanFactory中的一个钩子(回调),称之为后置处理器

AbstractAutoProxyCreator#postProcessBeforeInstantiation()

AbstractAutoProxyCreator#postProcessAfterInitialization()

AbstractAutoProxyCreator#wrapIfNecessary()

AbstractAutoProxyCreator#getAdvicesAndAdvisorsForBean()

SeataAutoDataSourceProxyCreator#getAdvicesAndAdvisorsForBean()

SeataAutoDataSourceProxyCreator#wrapIfNecessary()

DataSourceProxyHolder维护了数据源对象与数据源代理对象的映射

至此,数据源代理部分就看完了,下面总结一下:

1、启动的时候自动配置数据源代理,创建了一个SeataAutoDataSourceProxyCreator

2、SeataAutoDataSourceProxyCreator在构造方法中创建AOP通知,并赋值给其属性

3、AbstractAutoProxyCreator是一个抽象类不能被实例化,能实例化的只有SeataAutoDataSourceProxyCreator

4、SeataAutoDataSourceProxyCreator从AbstractAutoProxyCreator那里继承了很多属性和方法,其中就包括postProcessBeforeInstantiation()、postProcessAfterInitialization()、createProxy()等等

5、SeataAutoDataSourceProxyCreator间接实现了BeanPostProcessor接口,也就是说它也是BeanPostProcessor的一个实现类

6、BeanFactory回调所有的BeanPostProcessor#postProcessAfterInitialization()时,就会调用SeataAutoDataSourceProxyCreator的postProcessAfterInitialization()方法,最终会调用wrapIfNecessary()方法

7、wrapIfNecessary()只关心DataSource对象,它负责为DataSource对象生成代理对象,并且在SeataAutoDataSourceProxyCreator中维护了DataSource对象与SeataDataSourceProxy对象之间的映射关心

8、创建代理对象时,会给DataSource对象应用AOP拦截器。用AOP的话来讲,就是给目标对象DataSource织入通知,并创建一个被增强的代理对象

9、通知(拦截器)是SeataAutoDataSourceProxyAdvice,它实现了MethodInterceptor接口

10、SeataAutoDataSourceProxyAdvice#invoke()方法所做的事情就是,拿到原始DataSource的代理对象,并且在代理对象上调用目标方法

综上所述,以上做的所有工作都是为了将来调用 javax.sql.DataSource 上的任意方法时都会被拦截,然后调用其代理对象上对应的方法。而DataSource中最重要的一个方法就是getConnection()

划重点:将来,所有调用 javax.sql.DataSource#getConnection() 都会被拦截,然后在代理对象上执行getConnection(),因此可以这样说

调 javax.sql.DataSource#getConnection() 实际上执行的是 io.seata.rm.datasource.SeataDataSourceProxy#getConnection()

2. SeataAutoConfiguration

SeataAutoConfiguration里面主要是配置GlobalTransactionScanner(全局事务扫描器)

seata.enabled=true 才会开启 SeataAutoConfiguration

GlobalTransactionScanner 也继承自 AbstractAutoProxyCreator,同时还实现了InitializingBean接口。BeanFactory在设置了所有bean属性之后会调用InitializingBean的afterPropertiesSet()方法

GlobalTransactionScanner#afterPropertiesSet()

io.seata.common.DefaultValues中定义了很多默认值

同样地,因为实现了BeanPostProcessor接口,所以在启动时BeanFactory实例化Bean之后,会调用GlobalTransactionScanner的postProcessAfterInitialization(),尽管这个postProcessAfterInitialization()方法时从AbstractAutoProxyCreator那里继承来的,但是不影响啊,还是会调用GlobalTransactionScanner这个bean的postProcessAfterInitialization()方法。于是,最终又会调wrapIfNecessary()方法。

GlobalTransactionScanner#wrapIfNecessary()

这里面有一个很重要的逻辑就是,创建了一个GlobalTransactionalInterceptor对象,并赋值给interceptor

AbstractAutoProxyCreator#getAdvicesAndAdvisorsForBean()是一个抽象方法,实现在子类GlobalTransactionScanner中

因此,所有在GlobalTransactionScanner#wrapIfNecessary()中被代理的对象,都被应用GlobalTransactionalInterceptor

GlobalTransactionalInterceptor也是一个MethodInterceptor

也就是说,目标方法的调用都会转到GlobalTransactionalInterceptor#invoke()上

GlobalTransactionalInterceptor#handleGlobalTransaction()

事务执行直接调用TransactionalTemplate的execute()方法

io.seata.tm.api.TransactionalTemplate#execute()

io.seata.tm.api.GlobalTransactionContext#getCurrent() 获取当前事务

io.seata.tm.api.TransactionalTemplate#beginTransaction()

tx是DefaultGlobalTransaction

io.seata.tm.api.DefaultGlobalTransaction#begin()

DefaultGlobalTransaction中的TransactionManager是DefaultTransactionManager

DefaultTransactionManager中提供了事务相关的底层操作

io.seata.tm.api.DefaultGlobalTransaction#commit()

io.seata.tm.api.DefaultGlobalTransaction#rollback()的逻辑与commit()类似,都是重试调用transactionManager.rollback(xid)

全局事务扫描器部分的代码就看到这里,下面总结一下:

1、配置项seata.enabled=true 会触发 SeataAutoConfiguration 自动配置

2、SeataAutoConfiguration中创建了一个GlobalTransactionScanner

3、GlobalTransactionScanner继承了AbstractAutoProxyCreator,并实现InitializingBean接口

4、初始化TM、RM

5、由于继承了AbstractAutoProxyCreator,所以BeanFactory会调用GlobalTransactionScanner#方法postProcessAfterInitialization(),最终会调用GlobalTransactionScanner#wrapIfNecessary()来为目标对象创建代理对象

6、GlobalTransactionScanner#wrapIfNecessary()中创建了一个GlobalTransactionalInterceptor,GlobalTransactionalInterceptor是一个MethodInterceptor

7、在创建代理对象的时候,在AbstractAutoProxyCreator#wrapIfNecessary()方法中,为代理对象应用GlobalTransactionalInterceptor,于是所有目标对象上的方法调用就会转为调用GlobalTransactionalInterceptor#invoke()

8、GlobalTransactionalInterceptor#invoke()方法中,首先获取被调用的目标对象的Class和Method对象,然后检查目标方法或类上是否有@GlobalTransactional或@GlobalLock注解,而且配置项中不能禁用全局事务

9、如果加了@GlobalTransactional注解,则创建一个AspectTransactional,然后开始处理全局事务,默认传播特性是REQUIRED

10、如果加了@GlobalLock注解,则开始处理全局锁

11、处理全局事务就是直接调用事务模板中的execute方法,TransactionalTemplate#execute()是一个模板方法,其中定义了事务处理的流程。首先开启事务,然后执行业务逻辑,最后提交事务,异常回滚事务。

12、事务操作是在DefaultGlobalTransaction中处理的,最终处理在DefaultTransactionManager。DefaultTransactionManager负责同步远程调用,向TC发请求来开启、提交、回滚事务等操作

3. 数据库操作执行SQL语句

通过Java自带的JDBC操作数据库通常是这样的:

Class.forName(driverClass);
// 获取Connection
Connection connection = DriverManager.getConnection(url,user,password);
// 创建Statement或者PreparedStatement
Statement stmt = connection.createStatement();
stmt.execute(sql);

// PreparedStatement ps = connection.prepareStatement(sql);
// ps.execute();

MyBatis底层也是这一套

接下来看Seata是如何做的

首先是获取数据库连接Connection,前面已经说过了,调用DataSource的getConnection()方法底层是在代理对象SeataDataSourceProxy上调用getConnection()。SeataDataSourceProxy是接口,如果是AT模式,则这个数据源代理对象是DataSourceProxy

DataSourceProxy#getConnection()获取数据库连接

ConnectionProxy#createStatement()

ConnectionProxy#prepareStatement()

PreparedStatementProxy 继承自 StatementProxy,因此下面就直接看PreparedStatementProxy如何执行SQL

PreparedStatementProxy#execute()

ExecuteTemplate#execute()  是一个模板方法

挑一个看看吧,就挑UpdateExecutor

UpdateExecutor构造方法中一直调父类的构造法,既然如此,那么直接看BaseTransactionalExecutor

UpdateExecutor#execute()

这个方法时从BaseTransactionalExecutor那里继承来的,又是一个模板方法,可见设计模式是多么重要

AbstractDMLBaseExecutor#doExecute()

AbstractDMLBaseExecutor#executeAutoCommitTrue()

ConnectionProxy#changeAutoCommit()

现在事务自动提交已经被Seata改成false了

UpdateExecutor#beforeImage()

BaseTransactionalExecutor#prepareUndoLog()

接下来,提交事务

ConnectionProxy#commit()

ConnectionProxy#processGlobalTransactionCommit() 处理全局事务提交

分支事务提交以后,业务数据更改和undo_log就都提交了

回想一下,为什么在执行业务修改前要先将默认的自动提交改成手动提交,最后再改成自动提交呢?

因为,要将业务数据修改和插入undo_log放在同一个事务里,一起提交

这一切都归功于代理

回顾一下整个调用链

结合之前的案例,AT模式TC、TM、RM三者的交互应该是这样的:

问题一:为什么在执行的时候,先将数据库自动提交autoCommit设为false,最后再改成true呢?

答:因为,需要将undo_log和业务数据修改放到同一个事务中,这样可以保证业务数据修改成功后undo_log必然插入成功,所以Seata要将其改为手动提交。最后再改成true是因为默认autoCommit就是true,这样可以不影响其它业务。

问题二:什么情况下ConnectionContext中xid=null,且isGlobalLockRequire=true呢?或者换一种问法,什么情况下不在全局事务中,当仍然需要全局锁呢?

答:当业务方法上不加@GlobalTransactional,而是只加了@GlobalLock注解的情况下,就会出现上述情况,也就会执行 ConnectionProxy#processLocalCommitWithGlobalLocks()方法,在事务提交前检查全局锁,这样设计的目的是在AT模式下,不出现脏读、脏写。由于数据源被代理了,当一个加了@GlobalTransactional的全局事务,与另一个加了@GlobalTransactional或@GlobalLock注解的事务在本地事务提交前就会检查全局锁,要先获得全局锁才能提交本地事务,这样就避免了脏读脏写,从而相当于实现了全局事务的读已提交隔离级别。参见:https://seata.io/zh-cn/blog/seata-at-lock.html

关于Seata 1.5.2 Client端的源码学习就先到这里,欢迎交流~

如果你都已经看到了这里,不妨给我点个赞吧?

 

有关Seata 1.5.2 源码学习的更多相关文章

  1. UE4 源码阅读:从引擎启动到Receive Begin Play - 2

    一、引擎主循环UE版本:4.27一、引擎主循环的位置:Launch.cpp:GuardedMain函数二、、GuardedMain函数执行逻辑:1、EnginePreInit:加载大多数模块int32ErrorLevel=EnginePreInit(CmdLine);PreInit模块加载顺序:模块加载过程:(1)注册模块中定义的UObject,同时为每个类构造一个类默认对象(CDO,记录类的默认状态,作为模板用于子类实例创建)(2)调用模块的StartUpModule方法2、FEngineLoop::Init()1、检查Engine的配置文件找出使用了哪一个GameEngine类(UGame

  2. LC滤波器设计学习笔记(一)滤波电路入门 - 2

    目录前言滤波电路科普主要分类实际情况单位的概念常用评价参数函数型滤波器简单分析滤波电路构成低通滤波器RC低通滤波器RL低通滤波器高通滤波器RC高通滤波器RL高通滤波器部分摘自《LC滤波器设计与制作》,侵权删。前言最近需要学习放大电路和滤波电路,但是由于只在之前做音乐频谱分析仪的时候简单了解过一点点运放,所以也是相当从零开始学习了。滤波电路科普主要分类滤波器:主要是从不同频率的成分中提取出特定频率的信号。有源滤波器:由RC元件与运算放大器组成的滤波器。可滤除某一次或多次谐波,最普通易于采用的无源滤波器结构是将电感与电容串联,可对主要次谐波(3、5、7)构成低阻抗旁路。无源滤波器:无源滤波器,又称

  3. CAN协议的学习与理解 - 2

    最近在学习CAN,记录一下,也供大家参考交流。推荐几个我觉得很好的CAN学习,本文也是在看了他们的好文之后做的笔记首先是瑞萨的CAN入门,真的通透;秀!靠这篇我竟然2天理解了CAN协议!实战STM32F4CAN!原文链接:https://blog.csdn.net/XiaoXiaoPengBo/article/details/116206252CAN详解(小白教程)原文链接:https://blog.csdn.net/xwwwj/article/details/105372234一篇易懂的CAN通讯协议指南1一篇易懂的CAN通讯协议指南1-知乎(zhihu.com)视频推荐CAN总线个人知识总

  4. 深度学习部署:Windows安装pycocotools报错解决方法 - 2

    深度学习部署:Windows安装pycocotools报错解决方法1.pycocotools库的简介2.pycocotools安装的坑3.解决办法更多Ai资讯:公主号AiCharm本系列是作者在跑一些深度学习实例时,遇到的各种各样的问题及解决办法,希望能够帮助到大家。ERROR:Commanderroredoutwithexitstatus1:'D:\Anaconda3\python.exe'-u-c'importsys,setuptools,tokenize;sys.argv[0]='"'"'C:\\Users\\46653\\AppData\\Local\\Temp\\pip-instal

  5. ruby - 我正在学习编程并选择了 Ruby。我应该升级到 Ruby 1.9 吗? - 2

    我完全不是程序员,正在学习使用Ruby和Rails框架进行编程。我目前正在使用Ruby1.8.7和Rails3.0.3,但我想知道我是否应该升级到Ruby1.9,因为我真的没有任何升级的“遗留”成本。缺点是什么?我是否会遇到与普通gem的兼容性问题,或者甚至其他我不太了解甚至无法预料的问题? 最佳答案 你应该升级。不要坚持从1.8.7开始。如果您发现不支持1.9.2的gem,请避免使用它们(因为它们很可能不被维护)。如果您对gem是否兼容1.9.2有任何疑问,您可以在以下位置查看:http://www.railsplugins.or

  6. ruby - 我如何学习 ruby​​ 的正则表达式? - 2

    如何学习ruby​​的正则表达式?(对于假人) 最佳答案 http://www.rubular.com/在Ruby中使用正则表达式时是一个很棒的工具,因为它可以立即将结果可视化。 关于ruby-我如何学习ruby​​的正则表达式?,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/1881231/

  7. 深度学习12. CNN经典网络 VGG16 - 2

    深度学习12.CNN经典网络VGG16一、简介1.VGG来源2.VGG分类3.不同模型的参数数量4.3x3卷积核的好处5.关于学习率调度6.批归一化二、VGG16层分析1.层划分2.参数展开过程图解3.参数传递示例4.VGG16各层参数数量三、代码分析1.VGG16模型定义2.训练3.测试一、简介1.VGG来源VGG(VisualGeometryGroup)是一个视觉几何组在2014年提出的深度卷积神经网络架构。VGG在2014年ImageNet图像分类竞赛亚军,定位竞赛冠军;VGG网络采用连续的小卷积核(3x3)和池化层构建深度神经网络,网络深度可以达到16层或19层,其中VGG16和VGG

  8. 机器学习——时间序列ARIMA模型(四):自相关函数ACF和偏自相关函数PACF用于判断ARIMA模型中p、q参数取值 - 2

    文章目录1、自相关函数ACF2、偏自相关函数PACF3、ARIMA(p,d,q)的阶数判断4、代码实现1、引入所需依赖2、数据读取与处理3、一阶差分与绘图4、ACF5、PACF1、自相关函数ACF自相关函数反映了同一序列在不同时序的取值之间的相关性。公式:ACF(k)=ρk=Cov(yt,yt−k)Var(yt)ACF(k)=\rho_{k}=\frac{Cov(y_{t},y_{t-k})}{Var(y_{t})}ACF(k)=ρk​=Var(yt​)Cov(yt​,yt−k​)​其中分子用于求协方差矩阵,分母用于计算样本方差。求出的ACF值为[-1,1]。但对于一个平稳的AR模型,求出其滞

  9. Unity Shader 学习笔记(5)Shader变体、Shader属性定义技巧、自定义材质面板 - 2

    写在之前Shader变体、Shader属性定义技巧、自定义材质面板,这三个知识点任何一个单拿出来都是一套知识体系,不能一概而论,本文章目的在于将学习和实际工作中遇见的问题进行总结,类似于网络笔记之用,方便后续回顾查看,如有以偏概全、不祥不尽之处,还望海涵。1、Shader变体先看一段代码......Properties{ [KeywordEnum(on,off)]USL_USE_COL("IsUseColorMixTex?",int)=0 [Toggle(IS_RED_ON)]_IsRed("IsRed?",int)=0}......//中间省略,后续会有完整代码 #pragmamulti_c

  10. elasticsearch源码关于TransportSearchAction【阶段三】 - 2

    1.回顾.TransportServicepublicclassTransportServiceextendsAbstractLifecycleComponentTransportService:方法:1publicfinalTextendsTransportResponse>voidsendRequest(finalTransport.Connectionconnection,finalStringaction,finalTransportRequestrequest,finalTransportRequestOptionsoptions,TransportResponseHandlerT>

随机推荐