草庐IT

@Async并发线程异步处理

让你变好的过程从来都不会很舒服 2023-03-28 原文

一、@Async介绍

“异步调用”对应的是“同步调用”,同步调用指程序按照定义顺序依次执行,每一行程序都必须等待上一行程序执行完成之后才能执行;异步调用指程序在顺序执行时,不等待异步调用的语句返回结果就执行后面的程序。

顾名思义,@Async是用来实现异步的。基于@Async的方法,称之为异步方法。这些方法将在执行的时候,将会在独立的线程中被执行,调用者无需等待它的完成,即可继续其他的操作。

假如我们有一个Task类,其中有三个任务需要异步执行,那么我们就可以将这些任务方法标上@Async注解,使其成为异步方法。代码如下:

@Component
public class AsyncTask {
    private static Random random = new Random();

    @Async
    public void doTaskOne() throws Exception {
        System.out.println("开始做任务一");
        long start = System.currentTimeMillis();
        Thread.sleep(random.nextInt(10000));
        long end = System.currentTimeMillis();
        System.out.println("完成任务一,耗时:" + (end - start) + "毫秒");
    }
 
    @Async
    public void doTaskTwo() throws Exception {
        System.out.println("开始做任务二");
        long start = System.currentTimeMillis();
        Thread.sleep(random.nextInt(10000));
        long end = System.currentTimeMillis();
        System.out.println("完成任务二,耗时:" + (end - start) + "毫秒");
    }
 
    @Async
    public void doTaskThree() throws Exception {
        System.out.println("开始做任务三");
        long start = System.currentTimeMillis();
        Thread.sleep(random.nextInt(10000));
        long end = System.currentTimeMillis();
        System.out.println("完成任务三,耗时:" + (end - start) + "毫秒");
    }
}

为了让@Async注解能够生效,还需要在Spring Boot的主程序中配置@EnableAsync,如下所示:


@SpringBootApplication
@EnableAsync
public class Application {
  public static void main(String[] args) {
    SpringApplication.run(Application.class, args);
  }
}

然后我们可以写一个单元测试进行测试一下:


@RunWith(SpringJUnit4ClassRunner.class)
@SpringApplicationConfiguration(classes = Application.class)
public class ApplicationTests {
    @Autowired
    private Task task;
 
    @Test
    public void test() throws Exception {
    task.doTaskOne();
    task.doTaskTwo();
    task.doTaskThree();
    }
}

这时你会发现,你的异步线程还没执行完毕 ,主线程就已经执行完了,导致你想要输出的语句没有在你主线程结束前及时输出等一系列问题,这时候就需要Future来协助你了。

二、Future介绍

Future提供了三种功能: - 判断任务是否完成; - 能够中断任务; - 能够获取任务执行结果
它声明这样的五个方法:

  • cancel方法用来取消任务,如果取消任务成功则返回true,如果取消任务失败则返回false。
  • isCancelled方法表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回 true。
  • isDone方法表示任务是否已经完成,若任务完成,则返回true;
  • get()方法用来获取执行结果,这个方法会产生阻塞,会一直等到任务执行完毕才返回;
  • get(long timeout, TimeUnit unit)用来获取执行结果,如果在指定时间内,还没获取到结果,就直接返回null。

这样我们就能增加方法判断异步调用是否结束

    @Async
    public Future<String> doTaskOne() throws Exception {
        System.out.println("开始做任务一");
        long start = System.currentTimeMillis();
        Thread.sleep(random.nextInt(10000));
        long end = System.currentTimeMillis();
        System.out.println("完成任务一,耗时:" + (end - start) + "毫秒");
        return new AsyncResult<>("任务一完成");
    }

单元测试方法:

    @Test
    public void asyncTaskTest() throws Exception {
        long start = System.currentTimeMillis();
 
        Future<String> task1 = asyncTask.doTaskOne();
        Future<String> task2 = asyncTask.doTaskTwo();
        Future<String> task3 = asyncTask.doTaskThree();
 
        // 三个任务都调用完成,退出循环等待
        while (!task1.isDone() || !task2.isDone() || !task3.isDone()) {
            Thread.sleep(1000);
        }
 
        long end = System.currentTimeMillis();
        System.out.println("任务全部完成,总耗时:" + (end - start) + "毫秒");
    }

结果:

开始做任务一
开始做任务二
开始做任务三
完成任务二,耗时:5352毫秒
完成任务一,耗时:7190毫秒
完成任务三,耗时:7525毫秒
任务全部完成,总耗时:8004毫秒

到这里 一个简单的线程异步调用就结束了,但是,还可以进行优化处理,就是增加线程池,因为这样就可以自己规划线程创建的数量,进行资源效率利用的最大化处理。下面的Demo就是一个很好的例子;

三、线程并发处理Demo

  • 1、生成线程配置类及配置文件
package com.zfsoft.async;

import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

/**
 * 异步线程池配置
 * @author jiaq
 *
 */
public class OneWindowAsyncConfigurer implements AsyncConfigurer{

    @Value("${zfsoft.threadPool.corePoolSize: 20}")
    private int corePoolSize;

    @Value("${zfsoft.threadPool.queueCapacity: 100}")
    private int queueCapacity;

    @Value("${zfsoft.threadPool.maxPoolSize: 80}")
    private int maxPoolSize;

    @Value("${zfsoft.threadPool.keepAliveSeconds: 100}")
    private int keepAliveSeconds;

    @Bean(name = "asyncOneWindowPool")
    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //核心线程数50:线程池创建时候初始化的线程数
        executor.setCorePoolSize(corePoolSize);
        //用来缓冲执行任务的队列最大长度,一般需要设置值>=notifyScheduledMainExecutor.maxNum;默认为Integer.MAX_VALUE
        executor.setQueueCapacity(queueCapacity);
        // 线程池最大的线程数,只有在缓冲队列满了之后才会申请超过核心线程数的线程,默认为Integer.MAX_VALUE
        executor.setMaxPoolSize(maxPoolSize);
        //当超过了核心线程出之外的线程在空闲时间到达之后会被销毁,默认为60s
        executor.setKeepAliveSeconds(keepAliveSeconds);
        //线程池名的前缀
        executor.setThreadNamePrefix("SpringAsyncThread-");
        //线程池关闭的时候等待所有任务都完成再继续销毁其他的Bean
        executor.setWaitForTasksToCompleteOnShutdown(true);
        // rejection-policy:当pool已经达到max size的时候,如何处理新任务
        // CALLER_RUNS:不在新线程中执行任务,而是由调用者所在的线程来执行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        return executor;
    }

}

asyncThreadConfig.properties 使用配置文件动态配置

## 多线程 异步资源全局配置

## 核心线程数
zfsoft.threadPool.corePoolSize=20
## 队列大小
zfsoft.threadPool.queueCapacity=100
## 线程池最大的线程数
zfsoft.threadPool.maxPoolSize=100
## 线程最大空闲时间
zfsoft.threadPool.keepAliveSeconds=100
  • 2、@Async注解进行spring注入asyncAnnotationBeanPostProcessor
<!-- @Value注解 -->
<bean id="propertyConfigurer" class = "org.springframework.beans.factory.config.PreferencesPlaceholderConfigurer"></bean>
<!-- async注入spring -->
<bean id="asyncAnnotationBeanPostProcessor" class="org.springframework.scheduling.annotation.AsyncAnnotationBeanPostProcessor"/></bean>
<!-- 异步线程池配置 -->
<bean id="oneWindowAsyncConfigurer" class="com.zfsoft.async.OneWindowAsyncConfigurer"/></bean>
  • 3、业务代码书写
/**
     * 数据查询
     * @param mv
     * @param queryParam
     * @param dirList
     */
    public void messageQuery(ModelAndView mv,BlCaseZJStatisticsParam queryParam,List<String> dirList,ConcurrentHashMap<String, String> messageMap) throws Exception{
        List<Future<String>> resultList = new ArrayList<>(15);
        long xmlSt = System.currentTimeMillis();

        // 异步调用
        Future<String> stringFuture1 = blCaseZJStatisticsService.wssbQuantityCensus1(queryParam, dirList,messageMap);
        resultList.add(stringFuture1);
        Future<String> stringFuture2 = blCaseZJStatisticsService.blgcjsQuantityCensus2(queryParam,dirList,messageMap);
        resultList.add(stringFuture2);

        // 当执行成功移除map中数据,全部移除说明执行完成
        Future<String> tempFuture = null;
        Iterator<Future<String>> iterator = null;
        while(true) {
            // 避免死循环,设置超时时间
            long xmlCur = System.currentTimeMillis();
            if (xmlCur - xmlSt > 15000) {
                System.out.println("统计超时!");
                throw new RuntimeException("统计超时!");
            }
            if (resultList==null || resultList.isEmpty()) {
                long xmlEt = System.currentTimeMillis();
                System.out.println("统计总耗时:"+ (xmlEt - xmlSt) +"ms");
                break;
            }
            iterator = resultList.iterator();
            while (iterator.hasNext()) {
                tempFuture = iterator.next();
                if (tempFuture.isDone()) {
                    iterator.remove();
                }
            }
        }
    }
    @Async("asyncOneWindowPool")
    @Override
    public Future<String> wssbQuantityCensus1( BlCaseZJStatisticsParam queryParam,List<String> dirList,Map<String, String> messageMap) throws Exception {
        List<Map<String, String>> wssbQuantityCensus = BlCaseZJStatisticsDao.getWssbQuantityCensus(queryParam,dirList);
        int size = 0;
        if( wssbQuantityCensus!=null && wssbQuantityCensus.size()>0 ){
            size = wssbQuantityCensus.size();
        }
        messageMap.put("wssbCount",String.valueOf(size));
        return new AsyncResult<>(size+"");
    }

    @Async("asyncOneWindowPool")
    @Override
    public Future<String> blgcjsQuantityCensus2( BlCaseZJStatisticsParam queryParam,List<String> dirList,Map<String, String> messageMap) throws Exception {
        List<Map<String,String>> projectList = BlCaseZJStatisticsDao.getByslgcjsxms(queryParam,dirList);
        int size = 0;
        if(projectList!=null&&projectList.size()>0){
            size = projectList.size();
        }
        messageMap.put("blgcjsbCount",String.valueOf(size));
        return new AsyncResult<>(size+"");
    }

大功告成

四、可能遇到的问题

1、@Async所修饰的函数不要定义为static类型,这样异步调用不会生效。
2、不要在同一个类里去调用@Async所修饰异步的方法,也就是调用的方法和被异步调用的方法在一个类中,这样可能导致调用失败,因为在一个类中调用,优先会进行直接调用,除非你构造一个本类的代理类。
3、多线程中如果需要使用map,不要使用hashmap,切记,因为Hashmap是线程不安全的 ,查询数据会有问题,建议使用ConcurrentHashMap

有关@Async并发线程异步处理的更多相关文章

  1. ruby - 如何指定 Rack 处理程序 - 2

    Rackup通过Rack的默认处理程序成功运行任何Rack应用程序。例如:classRackAppdefcall(environment)['200',{'Content-Type'=>'text/html'},["Helloworld"]]endendrunRackApp.new但是当最后一行更改为使用Rack的内置CGI处理程序时,rackup给出“NoMethodErrorat/undefinedmethod`call'fornil:NilClass”:Rack::Handler::CGI.runRackApp.newRack的其他内置处理程序也提出了同样的反对意见。例如Rack

  2. ruby-on-rails - 如何在 ruby​​ 中使用两个参数异步运行 exe? - 2

    exe应该在我打开页面时运行。异步进程需要运行。有什么方法可以在ruby​​中使用两个参数异步运行exe吗?我已经尝试过ruby​​命令-system()、exec()但它正在等待过程完成。我需要用参数启动exe,无需等待进程完成是否有任何ruby​​gems会支持我的问题? 最佳答案 您可以使用Process.spawn和Process.wait2:pid=Process.spawn'your.exe','--option'#Later...pid,status=Process.wait2pid您的程序将作为解释器的子进程执行。除

  3. ruby - RuntimeError(自动加载常量 Apps 多线程时检测到循环依赖 - 2

    我收到这个错误:RuntimeError(自动加载常量Apps时检测到循环依赖当我使用多线程时。下面是我的代码。为什么会这样?我尝试多线程的原因是因为我正在编写一个HTML抓取应用程序。对Nokogiri::HTML(open())的调用是一个同步阻塞调用,需要1秒才能返回,我有100,000多个页面要访问,所以我试图运行多个线程来解决这个问题。有更好的方法吗?classToolsController0)app.website=array.join(',')putsapp.websiteelseapp.website="NONE"endapp.saveapps=Apps.order("

  4. ruby - 如何让Ruby捕获线程中的语法错误 - 2

    我正在尝试使用ruby​​编写一个双线程客户端,一个线程从套接字读取数据并将其打印出来,另一个线程读取本地数据并将其发送到远程服务器。我发现的问题是Ruby似乎无法捕获线程内的错误,这是一个示例:#!/usr/bin/rubyThread.new{loop{$stdout.puts"hi"abc.putsefsleep1}}loop{sleep1}显然,如果我在线程外键入abc.putsef,代码将永远不会运行,因为Ruby将报告“undefinedvariableabc”。但是,如果它在一个线程内,则没有错误报告。我的问题是,如何让Ruby捕获这样的错误?或者至少,报告线程中的错误?

  5. ruby - 如何在 ruby​​ 中运行后台线程? - 2

    我是ruby​​的新手,我认为重新构建一个我用C#编写的简单聊天程序是个好主意。我正在使用Ruby2.0.0MRI(Matz的Ruby实现)。问题是我想在服务器运行时为简单的服务器命令提供I/O。这是从示例中获取的服务器。我添加了使用gets()获取输入的命令方法。我希望此方法在后台作为线程运行,但该线程正在阻塞另一个线程。require'socket'#Getsocketsfromstdlibserver=TCPServer.open(2000)#Sockettolistenonport2000defcommandsx=1whilex==1exitProgram=gets.chomp

  6. ruby-on-rails - 在 Ruby on Rails 中发送响应之前如何等待多个异步操作完成? - 2

    在我做的一些网络开发中,我有多个操作开始,比如对外部API的GET请求,我希望它们同时开始,因为一个不依赖另一个的结果。我希望事情能够在后台运行。我找到了concurrent-rubylibrary这似乎运作良好。通过将其混合到您创建的类中,该类的方法具有在后台线程上运行的异步版本。这导致我编写如下代码,其中FirstAsyncWorker和SecondAsyncWorker是我编写的类,我在其中混合了Concurrent::Async模块,并编写了一个名为“work”的方法来发送HTTP请求:defindexop1_result=FirstAsyncWorker.new.async.

  7. ruby - Rails 开发服务器、PDFKit 和多线程 - 2

    我有一个使用PDFKit呈现网页的pdf版本的Rails应用程序。我使用Thin作为开发服务器。问题是当我处于开发模式时。当我使用“bundleexecrailss”启动我的服务器并尝试呈现任何PDF时,整个过程会陷入僵局,因为当您呈现PDF时,会向服务器请求一些额外的资源,如图像和css,看起来只有一个线程.如何配置Rails开发服务器以运行多个工作线程?非常感谢。 最佳答案 我找到的最简单的解决方案是unicorn.geminstallunicorn创建一个unicorn.conf:worker_processes3然后使用它:

  8. Ruby-vips 图像处理库。有什么好的使用示例吗? - 2

    我对图像处理完全陌生。我对JPEG内部是什么以及它是如何工作一无所知。我想知道,是否可以在某处找到执行以下简单操作的ruby​​代码:打开jpeg文件。遍历每个像素并将其颜色设置为fx绿色。将结果写入另一个文件。我对如何使用ruby​​-vips库实现这一点特别感兴趣https://github.com/ender672/ruby-vips我的目标-学习如何使用ruby​​-vips执行基本的图像处理操作(Gamma校正、亮度、色调……)任何指向比“helloworld”更复杂的工作示例的链接——比如ruby​​-vips的github页面上的链接,我们将不胜感激!如果有ruby​​-

  9. ruby - Faye WebSocket,关闭处理程序被触发后重新连接到套接字 - 2

    我有一个super简单的脚本,它几乎包含了FayeWebSocketGitHub页面上用于处理关闭连接的内容:ws=Faye::WebSocket::Client.new(url,nil,:headers=>headers)ws.on:opendo|event|p[:open]#sendpingcommand#sendtestcommand#ws.send({command:'test'}.to_json)endws.on:messagedo|event|#hereistheentrypointfordatacomingfromtheserver.pJSON.parse(event.d

  10. ruby - 如何使用 Ruby HTTP::Net 处理 404 错误? - 2

    我正在尝试解析网页,但有时会收到404错误。这是我用来获取网页的代码:result=Net::HTTP::getURI.parse(URI.escape(url))如何测试result是否为404错误代码? 最佳答案 像这样重写你的代码:uri=URI.parse(url)result=Net::HTTP.start(uri.host,uri.port){|http|http.get(uri.path)}putsresult.codeputsresult.body这将打印状态码和正文。

随机推荐