草庐IT

Spring多线程事务解决方案

破晓.K 2023-05-14 原文

Spring多线程事务解决方案

多线程事务场景以及解决思路

多线程事务场景举例

对批量操作进行性能优化时会用到多线程来并行处理,从而提高运行效率。而时有业务要求保证批量操作事务的一致性,但不同线程所对应的是不同的事务,这就无法通过Spring提供的注解@Transactional来保证线程间的事务一致性。这也是Spring事务失效的一种情况。

线程间不是同一个事务

因为数据库连接Connection以及Spring事务的参数(事务名、事务是否只读、事务传播行为等)是保存在ThreadLocal中,不同的线程中保存的Connection是不同的,所以数据库层面对于不同的连接就无法满足事务。

解决思路(分布式事务思路)

既然Spring提供的声明式事务注解@Transactional无法满足多线程间的事务一致性,那可以试试通过编程式事务的方式来解决。主要思路:每个线程都开启各自的事务,待所有线程的业务执行完成,统一提交或回滚。
我的解决方案参考分布式事务2PC(Two-phase commit protocol),中文叫二阶段提交。需要注意的是2PC是同步阻塞协议,各个线程需要等待所有的线程执行完成后才能进行下一步操作,在使用线程池执行任务时,如果线程池的最大线程数小于任务列表的数量,就会发生“死锁”,即获取到线程的任务阻塞等待没有获取线程的任务执行完成,而没有获取线程的任务会在阻塞队列中等待空闲线程的调用。这种情况需要使用一阶段的超时机制来“解开”,超时机制会发送回滚命令,线程池收到后进行回滚,但这种情况任务始终无法提交,再次提交结果依然是等到超时再回滚。再使用中需要结合具体业务来对线程池参数以及数据库连接池参数进行合理的设置。如果这里听的优点迷,可以先看下面具体代码实现再来结合这段文字思考。

可以参考文章:面试必问:分布式事务六种解决方案

代码实现

我这里封装成了一个工具类,通过方法execute(List runnableList, Executor executor)传入任务列表和线程池,我的代码实现不具备任务编排的能力,有需要可以结合CompletableFuture类自己实现一个。

import lombok.extern.slf4j.Slf4j;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;

import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * @author poxiao
 * @create 2023-01-05 22:22
 * <p>
 * 多线程事务管理器
 * 基于分布式事务思想,采用2PC(Two-phase commit protocol)协议
 * 解决基于线程池的多线程事务一致性问题
 */
@Slf4j
public class MultiThreadingTransactionManager {

    /**
     * 事务管理器
     */
    private final PlatformTransactionManager transactionManager;

    /**
     * 超时时间
     */
    private final long timeout;

    /**
     * 时间单位
     */
    private final TimeUnit unit;

    /**
     * 一阶段门闩,(第一阶段的准备阶段),当所有子线程准备完成时(除“提交/回滚”操作以外的工作都完成),countDownLatch的值为0
     */
    private CountDownLatch oneStageLatch = null;

    /**
     * 二阶段门闩,(第二阶段的执行执行),主线程将不再等待子线程执行,直接判定总的任务执行失败,执行第二阶段让等待确认的线程进行回滚
     */
    private final CountDownLatch twoStageLatch = new CountDownLatch(1);

    /**
     * 是否提交事务,默认是true(当任一线程发生异常时,isSubmit会被设置为false,即回滚事务)
     */
    private final AtomicBoolean isSubmit = new AtomicBoolean(true);

    /**
     * 构造方法
     * @param transactionManager 事务管理器
     * @param timeout 超时时间
     * @param unit 时间单位
     */
    public MultiThreadingTransactionManager(PlatformTransactionManager transactionManager, long timeout, TimeUnit unit) {
        this.transactionManager = transactionManager;
        this.timeout = timeout;
        this.unit = unit;
    }

    /**
     * 线程池方式执行任务,可保证线程间的事务一致性
     * @param runnableList 任务列表
     * @param executor 线程池
     * @return
     */
    public boolean execute(List<Runnable> runnableList, ExecutorService executor) {

        // 排除null值
        runnableList.removeAll(Collections.singleton(null));

        // 属性初始化
        innit(runnableList.size());

        // 遍历任务列表并放入线程池
        for (Runnable runnable : runnableList) {
            // 创建线程
            Thread thread = new Thread() {
                @Override
                public void run() {
                    // 如果别的线程执行失败,则该任务就不需要再执行了
                    if (!isSubmit.get()) {
                        log.info("当前子线程执行中止,因为线程事务中有子线程执行失败");
                        oneStageLatch.countDown();
                        return;
                    }
                    // 开启事务
                    TransactionStatus transactionStatus = transactionManager.getTransaction(new DefaultTransactionDefinition());
                    try {
                        // 执行业务逻辑
                        runnable.run();
                    } catch (Exception e) {
                        // 执行体发生异常,设置回滚
                        isSubmit.set(false);
                        log.error("线程{}:业务发生异常,执行体:{}", Thread.currentThread().getName(), runnable);
                    }
                    // 计数器减一
                    oneStageLatch.countDown();
                    try {
                        //等待所有线程任务完成,监控是否有异常,有则统一回滚
                        twoStageLatch.await();
                        // 根据isSubmit值判断事务是否提交,可能是子线程出现异常,也有可能是子线程执行超时
                        if (isSubmit.get()) {
                            // 提交
                            transactionManager.commit(transactionStatus);
                            log.info("线程{}:事务提交成功,执行体:{}", Thread.currentThread().getName(), runnable);
                        } else {
                            // 回滚
                            transactionManager.rollback(transactionStatus);
                            log.info("线程{}:事务回滚成功,执行体:{}", Thread.currentThread().getName(), runnable);
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            };
            executor.execute(thread);
        }

        /**
         * 主线程担任协调者,当第一阶段所有参与者准备完成,oneStageLatch的计数为0
         * 主线程发起第二阶段,执行阶段(提交或回滚),根据
         */
        try {
            // 主线程等待所有线程执行完成,超时时间设置为五秒
            oneStageLatch.await(timeout, unit);
            long count = oneStageLatch.getCount();
            System.out.println("countDownLatch值:" + count);
            // 主线程等待超时,子线程可能发生长时间阻塞,死锁
            if (count > 0) {
                // 设置为回滚
                isSubmit.set(false);
                log.info("主线线程等待超时,任务即将全部回滚");
            }
            twoStageLatch.countDown();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // 返回结果,是否执行成功,事务提交即为执行成功,事务回滚即为执行失败
        return isSubmit.get();
    }

    /**
     * 初始化属性
     * @param size 任务数量
     */
    private void innit(int size) {
        oneStageLatch = new CountDownLatch(size);
    }
}

注意1: 2PC是同步阻塞协议,各个任务会等待所有的任务完成准备阶段才能进一步执行,所以在使用中一定要给任务列表提供充足的空闲线程,比如任务列表长度为8,线程池最大线程数不能小于8,否则会使其中的几个任务得不到执行,而其他线程会一直进行等待。即使有一阶段超时处理,事务也始终得不到提交。
注意2: 如果你的任务是对数据库进行操作,需要考虑数据库连接是否充足,线程等待过程中不会释放数据库连接,如果Connection不够,即使任务被线程池调度执行,也会阻塞在获取数据库连接中,同样会发生“死锁”。

工具类使用演示

@Override
    public boolean addBatchByMultithreading2(List<User> users) {
        // 创建线程池
        ThreadPoolExecutor executor = UserServiceThreadPool.getExecutor();
        // 创建多线程事务管理器,传入事务管理器,指定超时时间为3秒
        MultiThreadingTransactionManager multiThreadingTransactionManage = new MultiThreadingTransactionManager(transactionManager, 3, TimeUnit.SECONDS);
        List<Runnable> runnableList = new ArrayList<>();
        // 模拟任务出现异常
        // runnableList.add(() -> {
        //     int a = 10 / 0;
        // });
        users.forEach((x) -> {
            runnableList.add(new Runnable() {
                @Override
                public void run() {
                    System.out.println("当前线程:" + Thread.currentThread().getName() + "插入数据:" + x);
                    userMapper.insert(x);
                }
            });
        });
        // 执行
        return multiThreadingTransactionManage.execute(runnableList, executor);
    }

我这里采用自定义线程池,线程池参数如下:

核心线程数为10、采用无界队列(可能会导致OOM,结合业务使用)

效果展示

任务业务正常,无异常抛出时正常提交事务情况

Controller层代码

Service层代码

数据库无记录

Postman测试

控制台打印日志提示事务提交成功,并且数据库新增8条记录

展示出现异常任务时回滚事务情况

Service层代码
控制台打印日志提示,多个线程事务进行回滚

有关Spring多线程事务解决方案的更多相关文章

  1. ruby - 在 jRuby 中使用 'fork' 生成进程的替代方案? - 2

    在MRIRuby中我可以这样做:deftransferinternal_server=self.init_serverpid=forkdointernal_server.runend#Maketheserverprocessrunindependently.Process.detach(pid)internal_client=self.init_client#Dootherstuffwithconnectingtointernal_server...internal_client.post('somedata')ensure#KillserverProcess.kill('KILL',

  2. ruby - 我可以使用 aws-sdk-ruby 在 AWS S3 上使用事务性文件删除/上传吗? - 2

    我发现ActiveRecord::Base.transaction在复杂方法中非常有效。我想知道是否可以在如下事务中从AWSS3上传/删除文件:S3Object.transactiondo#writeintofiles#raiseanexceptionend引发异常后,每个操作都应在S3上回滚。S3Object这可能吗?? 最佳答案 虽然S3API具有批量删除功能,但它不支持事务,因为每个删除操作都可以独立于其他操作成功/失败。该API不提供任何批量上传功能(通过PUT或POST),因此每个上传操作都是通过一个独立的API调用完成的

  3. ruby-on-rails - 带 Spring 锁的 Rails 4 控制台 - 2

    我正在使用Ruby2.1.1和Rails4.1.0.rc1。当执行railsc时,它被锁定了。使用Ctrl-C停止,我得到以下错误日志:~/.rvm/gems/ruby-2.1.1/gems/spring-1.1.2/lib/spring/client/run.rb:47:in`gets':Interruptfrom~/.rvm/gems/ruby-2.1.1/gems/spring-1.1.2/lib/spring/client/run.rb:47:in`verify_server_version'from~/.rvm/gems/ruby-2.1.1/gems/spring-1.1.

  4. ruby - RuntimeError(自动加载常量 Apps 多线程时检测到循环依赖 - 2

    我收到这个错误:RuntimeError(自动加载常量Apps时检测到循环依赖当我使用多线程时。下面是我的代码。为什么会这样?我尝试多线程的原因是因为我正在编写一个HTML抓取应用程序。对Nokogiri::HTML(open())的调用是一个同步阻塞调用,需要1秒才能返回,我有100,000多个页面要访问,所以我试图运行多个线程来解决这个问题。有更好的方法吗?classToolsController0)app.website=array.join(',')putsapp.websiteelseapp.website="NONE"endapp.saveapps=Apps.order("

  5. ruby - 分布式事务和队列,ruby,erlang,scala - 2

    我有一个涉及多台机器、消息队列和事务的问题。因此,例如用户点击网页,点击将消息发送到另一台机器,该机器将付款添加到用户的帐户。每秒可能有数千次点击。事务的所有方面都应该是容错的。我以前从未遇到过这样的事情,但一些阅读表明这是一个众所周知的问题。所以我的问题。我假设安全的方法是使用两阶段提交,但协议(protocol)是阻塞的,所以我不会获得所需的性能,我是否正确?我通常写Ruby,但似乎Redis之类的数据库和Rescue、RabbitMQ等消息队列系统对我的帮助不大——即使我实现某种两阶段提交,如果Redis崩溃,数据也会丢失,因为它本质上只是内存。所有这些让我开始关注erlang和

  6. 屏幕录制为什么没声音?检查这2项,轻松解决 - 2

    相信很多人在录制视频的时候都会遇到各种各样的问题,比如录制的视频没有声音。屏幕录制为什么没声音?今天小编就和大家分享一下如何录制音画同步视频的具体操作方法。如果你有录制的视频没有声音,你可以试试这个方法。 一、检查是否打开电脑系统声音相信很多小伙伴在录制视频后会发现录制的视频没有声音,屏幕录制为什么没声音?如果当时没有打开音频录制,则录制好的视频是没有声音的。因此,建议在录制前进行检查。屏幕上没有声音,很可能是因为你的电脑系统的声音被禁止了。您只需打开电脑系统的声音,即可录制音频和图画同步视频。操作方法:步骤1:点击电脑屏幕右下侧的“小喇叭”图案,在上方的选项中,选择“声音”。 步骤2:在“声

  7. 【高数】用拉格朗日中值定理解决极限问题 - 2

    首先回顾一下拉格朗日定理的内容:函数f(x)是在闭区间[a,b]上连续、开区间(a,b)上可导的函数,那么至少存在一个,使得:通过这个表达式我们可以知道,f(x)是函数的主体,a和b可以看作是主体函数f(x)中所取的两个值。那么可以有,  也就意味着我们可以用来替换 这种替换可以用在求某些多项式差的极限中。方法: 外层函数f(x)是一致的,并且h(x)和g(x)是等价无穷小。此时,利用拉格朗日定理,将原式替换为 ,再进行求解,往往会省去复合函数求极限的很多麻烦。使用要注意:1.要先找到主体函数f(x),即外层函数必须相同。2.f(x)找到后,复合部分是等价无穷小。3.要满足作差的形式。如果是加

  8. 深度学习部署:Windows安装pycocotools报错解决方法 - 2

    深度学习部署:Windows安装pycocotools报错解决方法1.pycocotools库的简介2.pycocotools安装的坑3.解决办法更多Ai资讯:公主号AiCharm本系列是作者在跑一些深度学习实例时,遇到的各种各样的问题及解决办法,希望能够帮助到大家。ERROR:Commanderroredoutwithexitstatus1:'D:\Anaconda3\python.exe'-u-c'importsys,setuptools,tokenize;sys.argv[0]='"'"'C:\\Users\\46653\\AppData\\Local\\Temp\\pip-instal

  9. ruby - 如何让Ruby捕获线程中的语法错误 - 2

    我正在尝试使用ruby​​编写一个双线程客户端,一个线程从套接字读取数据并将其打印出来,另一个线程读取本地数据并将其发送到远程服务器。我发现的问题是Ruby似乎无法捕获线程内的错误,这是一个示例:#!/usr/bin/rubyThread.new{loop{$stdout.puts"hi"abc.putsefsleep1}}loop{sleep1}显然,如果我在线程外键入abc.putsef,代码将永远不会运行,因为Ruby将报告“undefinedvariableabc”。但是,如果它在一个线程内,则没有错误报告。我的问题是,如何让Ruby捕获这样的错误?或者至少,报告线程中的错误?

  10. spring.profiles.active和spring.profiles.include的使用及区别说明 - 2

    转自:spring.profiles.active和spring.profiles.include的使用及区别说明下文笔者讲述spring.profiles.active和spring.profiles.include的区别简介说明,如下所示我们都知道,在日常开发中,开发|测试|生产环境都拥有不同的配置信息如:jdbc地址、ip、端口等此时为了避免每次都修改全部信息,我们则可以采用以上的属性处理此类异常spring.profiles.active属性例:配置文件,可使用以下方式定义application-${profile}.properties开发环境配置文件:application-dev

随机推荐