
🏆今日学习目标:
🍀JTA事务详解
✅创作者:林在闪闪发光
⏰预计时间:30分钟
🎉个人主页:林在闪闪发光的个人主页🍁林在闪闪发光的个人社区,欢迎你的加入: 林在闪闪发光的社区
目录
Atomikos TransactionManager 实战
鸡蛋,从外打破是食物,从内打破是生命,
人生也是如此,从外打破是压力,从内打破是成长
最近回顾 事务相关的设计与实现,发现 Spring 事务设计的最初目的是为了统一 Java 中 JDBC、JTA 与 JPA 事务的使用方式,并且其实现参考了 JTA 规范。大多数人对 JDBC 都比较熟悉,而 JTA 和 JPA 由于使用较少,很多人对其比较陌生,尤其是 JTA。
接下来会跟大家说明其他事务 像JDBC 事务 Hibernate 事务 EJB 事务 都会和大家一一介绍
网上 JTA 的相关文章,要么参照规范照本宣科的把原理简单介绍,要么就直接上 Spring Boot 整合 JTA 框架的代码,由于知识跨度比较大,增加了对 JTA 理解的难度。经过几天的不懈努力,查阅相关资料,我也对 JTA 有了一定的认识,这里将知识结构以循序渐进的方式进行介绍,也避免小伙伴们走弯路。
JTA 全称 Java Transaction API,是 X/OPEN CAE 规范中分布式事务 XA 规范在 Java 中的映射,是 Java 中使用事务的标准 API,同时支持单机事务与分布式事务。
作为 J2EE 平台规范的一部分,JTA 与 JDBC 类似,自身只提供了一组 Java 接口,需要由供应商来实现这些接口,与 JDBC 不同的是这些接口需要由不同的供应商来实现。
JTA 定义了分布式事务的 5 种角色,不同角色关注不同的内容,如下图所示。

1. 事务管理器
事务管理器 Transaction Manager 是分布式事务的核心,在 JTA 中使用 TransactionManager 接口表示,提供了事务操作、资源管理、同步、事务传播等功能,等同于 Spring 中的 PlatformTransactionManager。
2. 资源管理器
资源管理器 Resource Manager 提供了对资源访问的能力,典型的代表是关系型数据库、消息队列,在 JTA 中使用接口 XAResource 表示,通常通过一个资源适配器来实现,例如 JDBC 中的数据库驱动。
3. 通信资源管理器
通信资源管理器用于支持跨应用分布式事务的事务管理器之间的通信,JTA 规定它实现 JTS 规范定义的接口即可,通常用户不用关心。
4. 应用
使用分布式事务的应用,应用可以通过 JTA 规范中的 UserTransaction 接口来操作事务。
5. 应用服务器
应用的运行环境,JTA 规定事务管理器应该由应用服务器来实现,如 jboss、weblogic、websphere,不过并非所有的应用服务器都实现了事务管理器,如 Tomcat。如果想在标准环境使用 JTA,可以使用支持 JTA 的第三发类库,如 Atomikos、Bitronix。
事务中有一个特性是原子性,它表示事务中的所有操作要么全部完成,要么全部不完成。事务管理器通过两阶段提交实现这一特性。
两阶段提交将对事务的提交分成两部分,分别为准备阶段和提交阶段。
在准备阶段,事务管理器向资源管理器(如 JMS 消息队列或数据库)询问是否同意提交事务,如果资源管理器回复同意,则表示资源管理器可以将资源持久化。
在提交阶段,如果所有的资源管理器都回复同意,则事务管理器向所有的资源管理器发出提交请求,否则事务管理器向资源管理器发出回滚请求。
正常情况下的两阶段提交可以用如下的图来表示。

准备阶段失败导致事务回滚的两阶段提交过程可以用如下图来表示。

细心的小伙伴可能会想到一个问题,虽然准备阶段事务管理器与资源管理器正常交互,不过在提交阶段如果发生一些意外导致事务管理器与资源管理器之间的通信中断怎么办呢?这很可能导致分布式事务违反原子性。
- 如果资源管理器已经接收到事务管理器的提交请求,事务管理器恢复后要求资源管理器回滚将抛出
HeuristicCommitException异常。- 如果资源管理器决定回滚,事务管理器恢复后要求资源管理器提交,将抛出
HeuristicRollbackException异常。- 如果一部分资源管理器将资源提交,另一部分资源管理器将资源回滚,将导致
HeuristicMixedException异常。
异常的场景可以用如下图来描述。

上面只是介绍了 JTA 相关的一些概念和基本原理,要想理解 JTA,还得看相关 API。
JTA 的 API 大概可以分别为三部分,如下。

其中,应用使用的接口 UserTransaction 也同样需要事务管理器来实现。
XAResource 与 Xid 接口是 JDK 已提供的接口,位于包 javax.transaction.xa 包中。

其他的接口需要单独引入 jta 依赖,坐标如下。
<dependency>
<groupId>jakarta.transaction</groupId>
<artifactId>jakarta.transaction-api</artifactId>
<version>1.3.3</version>
</dependency>
其他接口位于包 javax.transaction 中。

除了 JTA 规范中的 API,特定于实现的资源管理器通常还会实现其他与 JTA 相关的接口。
1. JDBC
对于关系型数据库来说,还需要实现接口 javax.sql.XADataSource 与 javax.sql.XAConnection。
public interface XADataSource extends CommonDataSource {
XAConnection getXAConnection() throws SQLException;
XAConnection getXAConnection(String user, String password) throws SQLException;
}
XAConnection 接口定义如下。
public interface XAConnection extends PooledConnection {
javax.transaction.xa.XAResource getXAResource() throws SQLException;
}
也就是说,通过 XADataSource 获取 XAConnection,通过 XAConnection 获取 XAResource,然后再进行资源的相关操作。例如,MySQL 的 JDBC 驱动包就进行了相关实现。

另外一些第三方包也会进行相关实现,然后将实现作为代理,调用真正的实现,如 Druid。

2. JMS
对于 JMS 规范来说,消息队列作为资源管理器,除了要实现 XAResource 接口,还要实现 javax.jms.XASession 与 javax.jms.XAConnection 接口。这两个接口定义如下。
public interface XAConnection extends Connection {
XASession createXASession() throws JMSException;
Session createSession(boolean var1, int var2) throws JMSException;
}
public interface XASession extends Session {
Session getSession() throws JMSException;
XAResource getXAResource();
boolean getTransacted() throws JMSException;
void commit() throws JMSException;
void rollback() throws JMSException;
}
与 JDBC 类似,JMS 通过 XAConnection 获取 XASession,通过 XASession 获取 XAResource,然后再进行资源相关操作。
JTA API 围绕事务的整个生命周期,一般来说,用户不用关心具体的 API,不过只有了解 JTA API 的设计才能理解其设计与实现。
事务管理器可以创建新的事务,并设置事务的相关属性,还允许应用获取创建后的事务,并且将事务与线程绑定。具体有以下方法。
Transaction.commit、XAResource.prepare、XAResource.commit 方法调用。Transaction.rollback、XAResource.rollback 方法的调用。resume 方法参数恢复线程与事务的关系。实现将触发 Transaction.delistResource 与 XResource.end 方法的调用。Transaction.enlistResource、XAResource.start 方法的调用。Transaction 事务接口用于对活动的事务进行操作,这里活动的事务是指未提交的事务,对其他事务不可见。
XAResource.start 方开启事务分支。XAResource.end 方法调用结束事务分支。TransactionManager.commit 含义相同。TransactionManager.getStatus 含义相同。TransactionManager.rollback 含义相同。TransactionManager.setRollbackOnly 含义相同。表示事务与资源的关联,一个事务可以关联多个资源。可以由资源管理器或事务管理器来实现这个接口。接口方法如下:
XAResource 用来表示分布式事务角色中的资源管理器,资源适配器将实现 XAResource 接口以支持事务与资源的关联,允许不同的线程同时对 XAResource 进行操作,但一个 XAResource 同时只能关联一个事务。事务提交时,触发资源管理器准备与提交。具体方法如下:
Transaction.enlistResource 添加资源时判断资源是否已添加。这个接口比较简单,用来表示事务完成前后的回调,由应用实现这个接口。这个接口未持久化,崩溃恢复事务后将丢失 Synchronization 实例。接口方法如下:
功能受限的事务接口,暴露给应用使用,由事务管理器实现。
方法包括 begin、commit、rollback、setRollbackOnly、getStatus、setTransactionTimeout,含义与 Transaction 中的方法相同,不再赘述。
前面已经介绍了事务提交、回滚、异常场景下各组件的交互,事务提交之前的活动事务也有自己的交互流程。根据前面 API 的介绍,可以大概总结流程如下。

需要注意的是只有 Connection 被 close 才会调用 Transaction.delistResource 释放资源,这意味着应该在 try{}finaly{} 中的 finally 块关闭连接。
了解 JTA API 之后我们可以通过实战的方式加深理解,由于目前 EJB 容器慢慢淡出了大家的视野,我们使用事务管理器的实现 Atomikos 加以演示。
使用 UserTransaction 需要了解 Atomikos 提供的两个类。
UserTransactionImp:这个类实现是UserTransaction的实现,内部封装了TransactionManager。AtomikosDataSourceBean:这个类是DataSource的实现,内部封装了对XAResource的相关操作。
由于事务管理器和数据源都由 Atomikos 提供,因此其内部知道如何进行事务管理器、事务与资源之间的交互,例如可以将事务管理器设置为单例 bean,将事务/资源存到线程上下文。我们直接使用即可。
假定我们有一个 MySQL 数据库,数据库名为 test,表 user 数据结构如下。
create table user
(
id bigint unsigned auto_increment
primary key,
username varchar(20) null,
password varchar(20) null
)
我们可以测试使用 Atomikos 添加一条记录。首先引入依赖。
<dependency>
<groupId>jakarta.transaction</groupId>
<artifactId>jakarta.transaction-api</artifactId>
<version>1.3.3</version>
</dependency>
<dependency>
<groupId>com.atomikos</groupId>
<artifactId>transactions-jdbc</artifactId>
<version>4.0.6</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.27</version>
</dependency>
先提供一个获取数据源的静态方法。
public class Application {
private static DataSource getDataSource() {
Properties properties = new Properties();
properties.put("url", "jdbc:mysql://127.0.0.1:3306/test");
properties.put("user", "root");
properties.put("password", "12345678");
AtomikosDataSourceBean ds = new AtomikosDataSourceBean();
ds.setXaDataSourceClassName("com.mysql.cj.jdbc.MysqlXADataSource");
ds.setXaProperties(properties);
ds.setUniqueResourceName("resourceName");
ds.setPoolSize(10);
ds.setBorrowConnectionTimeout(60);
return ds;
}
}
可以看到,使用 Atomikos 提供的 UserTransaction 进行事务操作方式与普通的 JDBC 基本一致,只是使用了 Atomikos 提供的数据源获取连接,然后在进行 JDBC 操作前后添加了使用 UserTransaction 开启/结束事务的逻辑。
这里只是使用了一个数据源,也可以使用多个数据源开启分布式事务。
如果不想使用 AtomikosDataSourceBean,也可以手动调用 JTA 的 API,标准环境使用 TransactionManager 的实现类 UserTransactionManager 即可,Web 环境也可以切换为 J2eeTransactionManager。示例代码如下。
public class Application {
public static void main(String[] args) throws Exception {
TransactionManager tm = new UserTransactionManager();
tm.begin();
// 使用 MySQL XADataSource 的实现
MysqlXADataSource ds = new MysqlXADataSource();
ds.setURL("jdbc:mysql://127.0.0.1:3306/test");
ds.setUser("root");
ds.setPassword("12345678");
Connection conn = null;
PreparedStatement ps = null;
boolean error = false;
try {
// 获取 XAResource
XAConnection xaconn = ds.getXAConnection();
XAResource xares = xaconn.getXAResource();
// 从事务管理器中获取事务
Transaction tx = tm.getTransaction();
// 事务关联资源
tx.enlistResource(xares);
conn = xaconn.getConnection();
ps = conn.prepareStatement("insert into user(username,password) values('hkp','123')");
ps.executeUpdate();
// 事务与资源解除关联
tx.delistResource(xares, XAResource.TMSUCCESS);
} catch (Exception e) {
error = true;
} finally {
// 先使用事务管理器完成事务
if (tm.getStatus() != Status.STATUS_NO_TRANSACTION) {
if (error) {
// 遇到异常回滚事务
tm.rollback();
} else {
// 正常提交事务
tm.commit();
}
}
// 最后再关闭 JDBC 中的 Statement 和 Connection
if (ps != null) {
ps.close();
}
if (conn != null) {
conn.close();
}
}
}
}
代码完全遵循前面的示意图。
TransactionManager。TransactionManager.begin 方法开启事务。XADataSource 并获取 XAResource。Transaction 并将其与 XResource 关联。Spring 对 JDBC、JTA、JPA 的事务进行封装,提供了自己的事务管理器。
首先引入 Spring 相关依赖。
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-tx</artifactId>
<version>5.2.6.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.2.6.RELEASE</version>
</dependency>
对于 JTA 来说,配置如下。
@Configuration
@EnableTransactionManagement
public class JTAConfig {
@Bean(initMethod = "init", destroyMethod = "close")
public AtomikosDataSourceBean dataSource() {
Properties properties = new Properties();
properties.put("url", "jdbc:mysql://127.0.0.1:3306/test");
properties.put("user", "root");
properties.put("password", "12345678");
AtomikosDataSourceBean ds = new AtomikosDataSourceBean();
ds.setXaDataSourceClassName("com.mysql.cj.jdbc.MysqlXADataSource");
ds.setXaProperties(properties);
ds.setUniqueResourceName("resourceName");
ds.setPoolSize(10);
ds.setBorrowConnectionTimeout(60);
return ds;
}
@Bean(initMethod = "init", destroyMethod = "close")
public UserTransactionManager userTransactionManager() throws SystemException {
UserTransactionManager userTransactionManager = new UserTransactionManager();
userTransactionManager.setTransactionTimeout(300);
userTransactionManager.setForceShutdown(true);
return userTransactionManager;
}
@Bean
public JtaTransactionManager jtaTransactionManager() throws SystemException {
JtaTransactionManager jtaTransactionManager = new JtaTransactionManager();
jtaTransactionManager.setTransactionManager(userTransactionManager());
jtaTransactionManager.setUserTransaction(userTransactionManager());
return jtaTransactionManager;
}
}
然后定义我们操作数据库的 UserService 如下。
@Service
public class UserService {
@Autowired
private DataSource dataSource;
@Transactional(rollbackFor = Exception.class)
public void testInsert() {
try (Connection conn = dataSource.getConnection();
PreparedStatement ps = conn.prepareStatement("insert into user(username,password) values('kkk','789')")) {
int count = ps.executeUpdate();
System.out.println("count" + count);
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
}
最后运行测试类。
public class Application {
public static void main(String[] args) throws Exception {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
context.scan("com.lin.demo");
context.refresh();
UserService userService = context.getBean(UserService.class);
userService.testInsert();
context.close();
}
}
成功将数据插入数据库,如果事务方法抛出异常则不会提交事务到数据库。
Spring Boot 环境下的 Atomikos 使用较为简单,首先引入相关依赖。
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.27</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jta-atomikos</artifactId>
</dependency>
然后在 application.properties 进行数据源相关配置。
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/test
spring.datasource.username=root
spring.datasource.password=root1234
这样就完成了。
- 引入
spring-boot-starter-jta-atomikos之后 Spring 会自动配置JtaTransactionManager和AtomikosDataSourceBean。- 引入
spring-boot-starter-jdbc则是为了引入事务相关依赖与功能特性。
仍然使用上述示例中的 UserService,修改测试类如下。
@SpringBootApplication
public class Application implements CommandLineRunner {
public static void main(String[] args) throws Exception {
SpringApplication.run(Application.class, args);
}
@Autowired
private UserService userService;
@Override
public void run(String... args) throws Exception {
userService.testInsert();
}
}

我发现ActiveRecord::Base.transaction在复杂方法中非常有效。我想知道是否可以在如下事务中从AWSS3上传/删除文件:S3Object.transactiondo#writeintofiles#raiseanexceptionend引发异常后,每个操作都应在S3上回滚。S3Object这可能吗?? 最佳答案 虽然S3API具有批量删除功能,但它不支持事务,因为每个删除操作都可以独立于其他操作成功/失败。该API不提供任何批量上传功能(通过PUT或POST),因此每个上传操作都是通过一个独立的API调用完成的
我真的很习惯使用Ruby编写以下代码:my_hash={}my_hash['test']=1Java中对应的数据结构是什么? 最佳答案 HashMapmap=newHashMap();map.put("test",1);我假设? 关于java-等价于Java中的RubyHash,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/22737685/
我正在尝试使用boilerpipe来自JRuby。我看过guide从JRuby调用Java,并成功地将它与另一个Java包一起使用,但无法弄清楚为什么同样的东西不能用于boilerpipe。我正在尝试基本上从JRuby中执行与此Java等效的操作:URLurl=newURL("http://www.example.com/some-location/index.html");Stringtext=ArticleExtractor.INSTANCE.getText(url);在JRuby中试过这个:require'java'url=java.net.URL.new("http://www
我有一个涉及多台机器、消息队列和事务的问题。因此,例如用户点击网页,点击将消息发送到另一台机器,该机器将付款添加到用户的帐户。每秒可能有数千次点击。事务的所有方面都应该是容错的。我以前从未遇到过这样的事情,但一些阅读表明这是一个众所周知的问题。所以我的问题。我假设安全的方法是使用两阶段提交,但协议(protocol)是阻塞的,所以我不会获得所需的性能,我是否正确?我通常写Ruby,但似乎Redis之类的数据库和Rescue、RabbitMQ等消息队列系统对我的帮助不大——即使我实现某种两阶段提交,如果Redis崩溃,数据也会丢失,因为它本质上只是内存。所有这些让我开始关注erlang和
我只想对我一直在思考的这个问题有其他意见,例如我有classuser_controller和classuserclassUserattr_accessor:name,:usernameendclassUserController//dosomethingaboutanythingaboutusersend问题是我的User类中是否应该有逻辑user=User.newuser.do_something(user1)oritshouldbeuser_controller=UserController.newuser_controller.do_something(user1,user2)我
什么是ruby的rack或python的Java的wsgi?还有一个路由库。 最佳答案 来自Python标准PEP333:Bycontrast,althoughJavahasjustasmanywebapplicationframeworksavailable,Java's"servlet"APImakesitpossibleforapplicationswrittenwithanyJavawebapplicationframeworktoruninanywebserverthatsupportstheservletAPI.ht
这篇文章是继上一篇文章“Observability:从零开始创建Java微服务并监控它(一)”的续篇。在上一篇文章中,我们讲述了如何创建一个Javaweb应用,并使用Filebeat来收集应用所生成的日志。在今天的文章中,我来详述如何收集应用的指标,使用APM来监控应用并监督web服务的在线情况。源码可以在地址 https://github.com/liu-xiao-guo/java_observability 进行下载。摄入指标指标被视为可以随时更改的时间点值。当前请求的数量可以改变任何毫秒。你可能有1000个请求的峰值,然后一切都回到一个请求。这也意味着这些指标可能不准确,你还想提取最小/
HashMap中为什么引入红黑树,而不是AVL树呢1.概述开始学习这个知识点之前我们需要知道,在JDK1.8以及之前,针对HashMap有什么不同。JDK1.7的时候,HashMap的底层实现是数组+链表JDK1.8的时候,HashMap的底层实现是数组+链表+红黑树我们要思考一个问题,为什么要从链表转为红黑树呢。首先先让我们了解下链表有什么不好???2.链表上述的截图其实就是链表的结构,我们来看下链表的增删改查的时间复杂度增:因为链表不是线性结构,所以每次添加的时候,只需要移动一个节点,所以可以理解为复杂度是N(1)删:算法时间复杂度跟增保持一致查:既然是非线性结构,所以查询某一个节点的时候
目录前言滤波电路科普主要分类实际情况单位的概念常用评价参数函数型滤波器简单分析滤波电路构成低通滤波器RC低通滤波器RL低通滤波器高通滤波器RC高通滤波器RL高通滤波器部分摘自《LC滤波器设计与制作》,侵权删。前言最近需要学习放大电路和滤波电路,但是由于只在之前做音乐频谱分析仪的时候简单了解过一点点运放,所以也是相当从零开始学习了。滤波电路科普主要分类滤波器:主要是从不同频率的成分中提取出特定频率的信号。有源滤波器:由RC元件与运算放大器组成的滤波器。可滤除某一次或多次谐波,最普通易于采用的无源滤波器结构是将电感与电容串联,可对主要次谐波(3、5、7)构成低阻抗旁路。无源滤波器:无源滤波器,又称
@作者:SYFStrive @博客首页:HomePage📜:微信小程序📌:个人社区(欢迎大佬们加入)👉:社区链接🔗📌:觉得文章不错可以点点关注👉:专栏连接🔗💃:感谢支持,学累了可以先看小段由小胖给大家带来的街舞👉微信小程序(🔥)目录自定义组件-behaviors 1、什么是behaviors 2、behaviors的工作方式 3、创建behavior 4、导入并使用behavior 5、behavior中所有可用的节点 6、同名字段的覆盖和组合规则总结最后自定义组件-behaviors 1、什么是behaviorsbehaviors是小程序中,用于实现