草庐IT

主线程等待所有子线程结束的4种方法

shallwe小威 2024-02-21 原文

目录


主线程等待所有子线程结束的4种方法,包括使用 CountDownLatchCyclicBarrierFuture.get()Completable.allOf()

主线程不等待子线程全部结束

public class WaitThreadsDemo {
    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 100L,
            TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new ThreadPoolExecutor.CallerRunsPolicy());

    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 10; i++) {
            int index = i;
            executor.submit(() -> {
                System.out.println("子线程" + index);
            });
        }
        System.out.println("主线程-----");
    }
}

结果如下:

子线程0
子线程3
子线程5
主线程-----
子线程1
子线程7
子线程8
子线程9
子线程6
子线程4
子线程2

1、使用CountDownLatch

public class WaitThreadsDemo {
    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 100L,
            TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new ThreadPoolExecutor.CallerRunsPolicy());

    public static void main(String[] args) throws InterruptedException {
        long start = System.currentTimeMillis();
        CountDownLatch count = new CountDownLatch(10);
        for (int i = 0; i < 10; i++) {
            int index = i;
            executor.submit(() -> {
                try {
                    Thread.sleep(200);
                    System.out.println("子线程" + index);
                    count.countDown();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        count.await();
        System.out.println("主线程-----");
        long end = System.currentTimeMillis();
        System.out.println("花费时间:" + (end - start));
    }
}

结果如下:

子线程4
子线程1
子线程0
子线程2
子线程3
子线程9
子线程5
子线程6
子线程7
子线程8
主线程-----
花费时间:493

2、同步屏障CyclicBarrier

2.1、CyclicBarrier使用

public class WaitThreadsDemo {
    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 10, 100L,
            TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new ThreadPoolExecutor.CallerRunsPolicy());

    public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
        long start = System.currentTimeMillis();
        CyclicBarrier cyclicBarrier = new CyclicBarrier(11);
        for (int i = 0; i < 10; i++) {
            int index = i;
            executor.submit(() -> {
                try {
                    Thread.sleep(200);
                    System.out.println("子线程running" + index);
                    cyclicBarrier.await();
                    System.out.println("子线程end***" + index);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        }
        System.out.println("主线程running@@@");
        cyclicBarrier.await();
        System.out.println("主线程end-----");
        long end = System.currentTimeMillis();
        System.out.println("花费时间:" + (end - start));
    }
}

结果如下:

主线程running@@@
子线程running5
子线程running7
子线程running6
子线程running8
子线程running9
子线程running1
子线程running0
子线程running2
子线程running3
子线程running4
子线程end***4
主线程end-----
花费时间:289
子线程end***6
子线程end***5
子线程end***7
子线程end***8
子线程end***0
子线程end***3
子线程end***2
子线程end***9
子线程end***1

由代码和执行结果可以看出:

  1. 屏障拦截数要算上主线程即:屏障拦截数量=子线程数量+主线程数量(11=10+1)
  2. 主线程也要调用await()
  3. 主线程会等待所有子线程到达屏障(调用await()
  4. 主线程无法控制子线程到达屏障(调用await())后的操作,所以子线程调用await()要放在最后
  5. 如果使用线程池,线程池核心线程数要大于等于屏障拦截数

2.2、CyclicBarrier复用

public class WaitThreadsDemo {
    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 100L,
            TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new ThreadPoolExecutor.CallerRunsPolicy());

    public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
        long start = System.currentTimeMillis();
        CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> {
            System.out.println("主线程end----");
            long end = System.currentTimeMillis();
            System.out.println("花费时间:" + (end - start));
        });
        for (int i = 0; i < 10; i++) {
            int index = i;
            executor.submit(() -> {
                try {
                    Thread.sleep(200);
                    System.out.println("子线程running" + index);
                    cyclicBarrier.await();
                    System.out.println("子线程end***" + index);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        }
    }
}

结果如下:

2.3、CountDownLatch和CyclicBarrier的区别

  • CountDownLatch表示所有子线程一个一个执行完毕,子线程执行完之前主线程阻塞。
  • CyclicBarrier表示所有子线程全部到达一个障碍屏障前,然后一起执行完毕,和主线程无关。
  • CountDownLatch基于AQS实现,CyclicBarrier基于ReenTrantLock + Condition实现
    就像田径比赛一样,CountDownLatch每次countDown()减一表示一个参赛者冲过终点线,全部通过,比赛结束。
    CyclicBarrier相当于在终点线前设置一个障碍,所有参赛者到达障碍前,然后撤掉障碍手拉手通过终点线。
    CyclicBarrier的复用就跟“车满发车”一个道理,乘客陆续上车,车满发车。

3、使用Future.get()

public class WaitThreadsDemo {
    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 10, 100L,
            TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new ThreadPoolExecutor.CallerRunsPolicy());

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        long start = System.currentTimeMillis();
        List<Future> futureList = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            int index = i;
            Future future = executor.submit(() -> {
                try {
                    Thread.sleep(200);
                    System.out.println("子线程running" + index);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
            futureList.add(future);
        }
        for (Future future : futureList) {
            //监听线程池子线程执行状态及执行结果。
            future.get();
        }
        System.out.println("主线程end-----");
        long end = System.currentTimeMillis();
        System.out.println("花费时间:" + (end - start));
    }
}

结果如下

子线程running7
子线程running4
子线程running6
子线程running5
子线程running1
子线程running3
子线程running2
子线程running8
子线程running9
子线程running0
主线程end-----
花费时间:303

4、使用Completable.allOf()

public class WaitThreadsDemo {
    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 10, 100L,
            TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new ThreadPoolExecutor.CallerRunsPolicy());

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        long start = System.currentTimeMillis();
        List<CompletableFuture> futureList = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            int index = i;
            CompletableFuture<String> future = CompletableFuture.supplyAsync(()->{
                try {
                    Thread.sleep(200);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //模拟两次异常
                if (index % 7 ==0){
                    int tmp = index/0;
                }
                System.out.println("子线程running" + index);
                return "success";
            },executor).exceptionally(e-> e.getMessage());
            futureList.add(future);
        }
        //allOf()等待所有线程执行完毕
        CompletableFuture<Void> allFutures = CompletableFuture.allOf(futureList.toArray(new CompletableFuture[futureList.size()]));

        //但是allOf()拿不到执行结果,需要再次回调获取结果
        CompletableFuture<List<String>> finalFutures = allFutures.thenApply(v-> futureList.stream().map(CompletableFuture<String>::join).collect(Collectors.toList()));
        System.out.println(finalFutures.get());
        //也可以直接使用最初的futureList集合获取结果,如下两行:
        //List<String> resultList = futureList.stream().map(CompletableFuture<String>::join).collect(Collectors.toList());
        //System.out.println(resultList);
        
        System.out.println("主线程end-----");
        long end = System.currentTimeMillis();
        System.out.println("花费时间:" + (end - start));
    }
}

结果如下:

子线程running6
子线程running5
子线程running4
子线程running3
子线程running2
子线程running1
子线程running9
子线程running8
[java.lang.ArithmeticException: / by zero, success, success, success, success, success, success, java.lang.ArithmeticException: / by zero, success, success]
主线程end-----
花费时间:313

allOf()没有返回值,拿不到执行结果,需要再次使用回调函数获取最初futureList的执行结果。其实可以直接遍历futureList获取结果。
CompletableFuture的详细使用:CompletableFuture使用详解

有关主线程等待所有子线程结束的4种方法的更多相关文章

  1. ruby - 如何使用 Nokogiri 的 xpath 和 at_xpath 方法 - 2

    我正在学习如何使用Nokogiri,根据这段代码我遇到了一些问题:require'rubygems'require'mechanize'post_agent=WWW::Mechanize.newpost_page=post_agent.get('http://www.vbulletin.org/forum/showthread.php?t=230708')puts"\nabsolutepathwithtbodygivesnil"putspost_page.parser.xpath('/html/body/div/div/div/div/div/table/tbody/tr/td/div

  2. ruby - 如何从 ruby​​ 中的字符串运行任意对象方法? - 2

    总的来说,我对ruby​​还比较陌生,我正在为我正在创建的对象编写一些rspec测试用例。许多测试用例都非常基础,我只是想确保正确填充和返回值。我想知道是否有办法使用循环结构来执行此操作。不必为我要测试的每个方法都设置一个assertEquals。例如:describeitem,"TestingtheItem"doit"willhaveanullvaluetostart"doitem=Item.new#HereIcoulddotheitem.name.shouldbe_nil#thenIcoulddoitem.category.shouldbe_nilendend但我想要一些方法来使用

  3. ruby - 为什么我可以在 Ruby 中使用 Object#send 访问私有(private)/ protected 方法? - 2

    类classAprivatedeffooputs:fooendpublicdefbarputs:barendprivatedefzimputs:zimendprotecteddefdibputs:dibendendA的实例a=A.new测试a.foorescueputs:faila.barrescueputs:faila.zimrescueputs:faila.dibrescueputs:faila.gazrescueputs:fail测试输出failbarfailfailfail.发送测试[:foo,:bar,:zim,:dib,:gaz].each{|m|a.send(m)resc

  4. ruby - Facter::Util::Uptime:Module 的未定义方法 get_uptime (NoMethodError) - 2

    我正在尝试设置一个puppet节点,但ruby​​gems似乎不正常。如果我通过它自己的二进制文件(/usr/lib/ruby/gems/1.8/gems/facter-1.5.8/bin/facter)在cli上运行facter,它工作正常,但如果我通过由ruby​​gems(/usr/bin/facter)安装的二进制文件,它抛出:/usr/lib/ruby/1.8/facter/uptime.rb:11:undefinedmethod`get_uptime'forFacter::Util::Uptime:Module(NoMethodError)from/usr/lib/ruby

  5. Ruby 方法() 方法 - 2

    我想了解Ruby方法methods()是如何工作的。我尝试使用“ruby方法”在Google上搜索,但这不是我需要的。我也看过ruby​​-doc.org,但我没有找到这种方法。你能详细解释一下它是如何工作的或者给我一个链接吗?更新我用methods()方法做了实验,得到了这样的结果:'labrat'代码classFirstdeffirst_instance_mymethodenddefself.first_class_mymethodendendclassSecond使用类#returnsavailablemethodslistforclassandancestorsputsSeco

  6. ruby - 如何以所有可能的方式将字符串拆分为长度最多为 3 的连续子字符串? - 2

    我试图获取一个长度在1到10之间的字符串,并输出将字符串分解为大小为1、2或3的连续子字符串的所有可能方式。例如:输入:123456将整数分割成单个字符,然后继续查找组合。该代码将返回以下所有数组。[1,2,3,4,5,6][12,3,4,5,6][1,23,4,5,6][1,2,34,5,6][1,2,3,45,6][1,2,3,4,56][12,34,5,6][12,3,45,6][12,3,4,56][1,23,45,6][1,2,34,56][1,23,4,56][12,34,56][123,4,5,6][1,234,5,6][1,2,345,6][1,2,3,456][123

  7. ruby-on-rails - Rails 3.2.1 中 ActionMailer 中的未定义方法 'default_content_type=' - 2

    我在我的项目中添加了一个系统来重置用户密码并通过电子邮件将密码发送给他,以防他忘记密码。昨天它运行良好(当我实现它时)。当我今天尝试启动服务器时,出现以下错误。=>BootingWEBrick=>Rails3.2.1applicationstartingindevelopmentonhttp://0.0.0.0:3000=>Callwith-dtodetach=>Ctrl-CtoshutdownserverExiting/Users/vinayshenoy/.rvm/gems/ruby-1.9.3-p0/gems/actionmailer-3.2.1/lib/action_mailer

  8. ruby - Highline 询问方法不会使用同一行 - 2

    设置:狂欢ruby1.9.2高线(1.6.13)描述:我已经相当习惯在其他一些项目中使用highline,但已经有几个月没有使用它了。现在,在Ruby1.9.2上全新安装时,它似乎不允许在同一行回答提示。所以以前我会看到类似的东西:require"highline/import"ask"Whatisyourfavoritecolor?"并得到:Whatisyourfavoritecolor?|现在我看到类似的东西:Whatisyourfavoritecolor?|竖线(|)符号是我的终端光标。知道为什么会发生这种变化吗? 最佳答案

  9. ruby - 主要 :Object when running build from sublime 的未定义方法 `require_relative' - 2

    我已经从我的命令行中获得了一切,所以我可以运行rubymyfile并且它可以正常工作。但是当我尝试从sublime中运行它时,我得到了undefinedmethod`require_relative'formain:Object有人知道我的sublime设置中缺少什么吗?我正在使用OSX并安装了rvm。 最佳答案 或者,您可以只使用“require”,它应该可以正常工作。我认为“require_relative”仅适用于ruby​​1.9+ 关于ruby-主要:Objectwhenrun

  10. ruby - 多个属性的 update_column 方法 - 2

    我有一个具有一些属性的模型:attr1、attr2和attr3。我需要在不执行回调和验证的情况下更新此属性。我找到了update_column方法,但我想同时更新三个属性。我需要这样的东西:update_columns({attr1:val1,attr2:val2,attr3:val3})代替update_column(attr1,val1)update_column(attr2,val2)update_column(attr3,val3) 最佳答案 您可以使用update_columns(attr1:val1,attr2:val2

随机推荐