草庐IT

Java多种方法实现等待所有子线程完成再继续执行

南瓜慢说 www.pkslow.com 2023-03-28 原文

简介

在现实世界中,我们常常需要等待其它任务完成,才能继续执行下一步。Java实现等待子线程完成再继续执行的方式很多。我们来一一查看一下。

Thread的join方法

该方法是Thread提供的方法,调用join()时,会阻塞主线程,等该Thread完成才会继续执行,代码如下:

private static void threadJoin() {
  List<Thread> threads = new ArrayList<>();

  for (int i = 0; i < NUM; i++) {
    Thread t = new Thread(new PkslowTask("Task " + i));
    t.start();
    threads.add(t);
  }
  threads.forEach(t -> {
    try {
      t.join();
    } catch (InterruptedException e) {
      throw new RuntimeException(e);
    }
  });

  System.out.println("threadJoin Finished All Tasks...");

}

结果:

Task 6 is running
Task 9 is running
Task 3 is running
Task 4 is running
Task 7 is running
Task 0 is running
Task 2 is running
Task 1 is running
Task 5 is running
Task 8 is running
Task 1 is completed
Task 8 is completed
Task 6 is completed
Task 4 is completed
Task 3 is completed
Task 0 is completed
Task 7 is completed
Task 9 is completed
Task 2 is completed
Task 5 is completed
threadJoin Finished All Tasks...

CountDownLatch

CountDownLatch是一个很好用的并发工具,初始化时要指定线程数,如10。在子线程调用countDown()时计数减1。直到为0时,await()方法才不会阻塞。代码如下:

private static void countDownLatch() {
  CountDownLatch latch = new CountDownLatch(NUM);
  for (int i = 0; i < NUM; i++) {
    Thread t = new Thread(() -> {
      System.out.println("countDownLatch running...");
      try {
        Thread.sleep(1000);
        System.out.println("countDownLatch Finished...");
        latch.countDown();
      } catch (InterruptedException e) {
        throw new RuntimeException(e);
      }
    });
    t.start();
  }

  try {
    latch.await();
  } catch (InterruptedException e) {
    throw new RuntimeException(e);
  }
  System.out.println("countDownLatch Finished All Tasks...");
}

结果:

countDownLatch running...
countDownLatch running...
countDownLatch running...
countDownLatch running...
countDownLatch running...
countDownLatch running...
countDownLatch running...
countDownLatch running...
countDownLatch running...
countDownLatch running...
countDownLatch Finished...
countDownLatch Finished...
countDownLatch Finished...
countDownLatch Finished...
countDownLatch Finished...
countDownLatch Finished...
countDownLatch Finished...
countDownLatch Finished...
countDownLatch Finished...
countDownLatch Finished...
countDownLatch Finished All Tasks...

CyclicBarrier

CyclicBarrier与CountDownLatch类似,但CyclicBarrier可重置,可重用。代码如下:

private static void cyclicBarrier() {
  CyclicBarrier barrier = new CyclicBarrier(NUM + 1);

  for (int i = 0; i < NUM; i++) {
    Thread t = new Thread(() -> {
      System.out.println("cyclicBarrier running...");
      try {
        Thread.sleep(1000);
        System.out.println("cyclicBarrier Finished...");
        barrier.await();
      } catch (InterruptedException | BrokenBarrierException e) {
        throw new RuntimeException(e);
      }
    });
    t.start();
  }

  try {
    barrier.await();
  } catch (InterruptedException | BrokenBarrierException e) {
    throw new RuntimeException(e);
  }
  System.out.println("cyclicBarrier Finished All Tasks...");
}

结果:

cyclicBarrier running...
cyclicBarrier running...
cyclicBarrier running...
cyclicBarrier running...
cyclicBarrier running...
cyclicBarrier running...
cyclicBarrier running...
cyclicBarrier running...
cyclicBarrier running...
cyclicBarrier running...
cyclicBarrier Finished...
cyclicBarrier Finished...
cyclicBarrier Finished...
cyclicBarrier Finished...
cyclicBarrier Finished...
cyclicBarrier Finished...
cyclicBarrier Finished...
cyclicBarrier Finished...
cyclicBarrier Finished...
cyclicBarrier Finished...
cyclicBarrier Finished All Tasks...

executorService.isTerminated()

ExecutorService调用shutdown()方法后,可以通过方法isTerminated()来判断任务是否完成。代码如下:

private static void executeServiceIsTerminated() {
  ExecutorService executorService = Executors.newFixedThreadPool(THREADS);
  IntStream.range(0, NUM)
    .forEach(i -> executorService.execute(new PkslowTask("Task " + i)));
  executorService.shutdown();
  while (!executorService.isTerminated()) {
    //waiting...
  }
  System.out.println("executeServiceIsTerminated Finished All Tasks...");

}

结果:

Task 0 is running
Task 2 is running
Task 1 is running
Task 3 is running
Task 4 is running
Task 0 is completed
Task 2 is completed
Task 5 is running
Task 4 is completed
Task 7 is running
Task 3 is completed
Task 1 is completed
Task 8 is running
Task 6 is running
Task 9 is running
Task 5 is completed
Task 9 is completed
Task 7 is completed
Task 6 is completed
Task 8 is completed
executeServiceIsTerminated Finished All Tasks...

executorService.awaitTermination

executorService.awaitTermination方法会等待任务完成,并给一个超时时间,代码如下:

private static void executeServiceAwaitTermination() {
  ExecutorService executorService = Executors.newFixedThreadPool(THREADS);
  IntStream.range(0, NUM)
    .forEach(i -> executorService.execute(new PkslowTask("Task " + i)));
  executorService.shutdown();

  try {
    if (!executorService.awaitTermination(1, TimeUnit.MINUTES)) {
      executorService.shutdownNow();
    }
  } catch (InterruptedException e) {
    throw new RuntimeException(e);
  }
  System.out.println("executeServiceAwaitTermination Finished All Tasks...");
}

结果:

Task 0 is running
Task 1 is running
Task 2 is running
Task 3 is running
Task 4 is running
Task 0 is completed
Task 5 is running
Task 1 is completed
Task 4 is completed
Task 7 is running
Task 3 is completed
Task 8 is running
Task 2 is completed
Task 9 is running
Task 6 is running
Task 5 is completed
Task 7 is completed
Task 9 is completed
Task 8 is completed
Task 6 is completed
executeServiceAwaitTermination Finished All Tasks...

executorService.invokeAll

使用invokeAll提交所有任务,代码如下:

private static void executeServiceInvokeAll() {
  ExecutorService executorService = Executors.newFixedThreadPool(THREADS);
  List<Callable<Void>> tasks = new ArrayList<>();

  IntStream.range(0, NUM)
    .forEach(i -> tasks.add(new PkslowTask("Task " + i)));

  try {
    executorService.invokeAll(tasks);
  } catch (InterruptedException e) {
    throw new RuntimeException(e);
  }

  executorService.shutdown();
  System.out.println("executeServiceInvokeAll Finished All Tasks...");
}

结果:

Task 1 is running
Task 2 is running
Task 0 is running
Task 3 is running
Task 4 is running
Task 1 is completed
Task 3 is completed
Task 0 is completed
Task 2 is completed
Task 4 is completed
Task 8 is running
Task 5 is running
Task 6 is running
Task 9 is running
Task 7 is running
Task 8 is completed
Task 5 is completed
Task 6 is completed
Task 9 is completed
Task 7 is completed
executeServiceInvokeAll Finished All Tasks...

ExecutorCompletionService

ExecutorCompletionService通过take()方法,会返回最早完成的任务,代码如下:

private static void executorCompletionService() {
  ExecutorService executorService = Executors.newFixedThreadPool(10);
  CompletionService<String> service = new ExecutorCompletionService<>(executorService);

  List<Callable<String>> callables = new ArrayList<>();
  callables.add(new DelayedCallable(2000, "2000ms"));
  callables.add(new DelayedCallable(1500, "1500ms"));
  callables.add(new DelayedCallable(6000, "6000ms"));
  callables.add(new DelayedCallable(2500, "2500ms"));
  callables.add(new DelayedCallable(300, "300ms"));
  callables.add(new DelayedCallable(3000, "3000ms"));
  callables.add(new DelayedCallable(1100, "1100ms"));
  callables.add(new DelayedCallable(100, "100ms"));
  callables.add(new DelayedCallable(100, "100ms"));
  callables.add(new DelayedCallable(100, "100ms"));

  callables.forEach(service::submit);

  for (int i = 0; i < NUM; i++) {
    try {
      Future<String> future = service.take();
      System.out.println(future.get() + " task is completed");
    } catch (InterruptedException | ExecutionException e) {
      throw new RuntimeException(e);
    }
  }

  System.out.println("executorCompletionService Finished All Tasks...");

  executorService.shutdown();
  awaitTerminationAfterShutdown(executorService);
}

这里不同任务的时长是不一样的,但会先返回最早完成的任务:

2000ms is running
2500ms is running
300ms is running
1500ms is running
6000ms is running
3000ms is running
1100ms is running
100ms is running
100ms is running
100ms is running
100ms is completed
100ms is completed
100ms task is completed
100ms task is completed
100ms is completed
100ms task is completed
300ms is completed
300ms task is completed
1100ms is completed
1100ms task is completed
1500ms is completed
1500ms task is completed
2000ms is completed
2000ms task is completed
2500ms is completed
2500ms task is completed
3000ms is completed
3000ms task is completed
6000ms is completed
6000ms task is completed
executorCompletionService Finished All Tasks...

代码

代码请看GitHub: https://github.com/LarryDpk/pkslow-samples

有关Java多种方法实现等待所有子线程完成再继续执行的更多相关文章

  1. ruby - 如何使用 Nokogiri 的 xpath 和 at_xpath 方法 - 2

    我正在学习如何使用Nokogiri,根据这段代码我遇到了一些问题:require'rubygems'require'mechanize'post_agent=WWW::Mechanize.newpost_page=post_agent.get('http://www.vbulletin.org/forum/showthread.php?t=230708')puts"\nabsolutepathwithtbodygivesnil"putspost_page.parser.xpath('/html/body/div/div/div/div/div/table/tbody/tr/td/div

  2. ruby - 如何从 ruby​​ 中的字符串运行任意对象方法? - 2

    总的来说,我对ruby​​还比较陌生,我正在为我正在创建的对象编写一些rspec测试用例。许多测试用例都非常基础,我只是想确保正确填充和返回值。我想知道是否有办法使用循环结构来执行此操作。不必为我要测试的每个方法都设置一个assertEquals。例如:describeitem,"TestingtheItem"doit"willhaveanullvaluetostart"doitem=Item.new#HereIcoulddotheitem.name.shouldbe_nil#thenIcoulddoitem.category.shouldbe_nilendend但我想要一些方法来使用

  3. ruby - 为什么我可以在 Ruby 中使用 Object#send 访问私有(private)/ protected 方法? - 2

    类classAprivatedeffooputs:fooendpublicdefbarputs:barendprivatedefzimputs:zimendprotecteddefdibputs:dibendendA的实例a=A.new测试a.foorescueputs:faila.barrescueputs:faila.zimrescueputs:faila.dibrescueputs:faila.gazrescueputs:fail测试输出failbarfailfailfail.发送测试[:foo,:bar,:zim,:dib,:gaz].each{|m|a.send(m)resc

  4. ruby - Facter::Util::Uptime:Module 的未定义方法 get_uptime (NoMethodError) - 2

    我正在尝试设置一个puppet节点,但ruby​​gems似乎不正常。如果我通过它自己的二进制文件(/usr/lib/ruby/gems/1.8/gems/facter-1.5.8/bin/facter)在cli上运行facter,它工作正常,但如果我通过由ruby​​gems(/usr/bin/facter)安装的二进制文件,它抛出:/usr/lib/ruby/1.8/facter/uptime.rb:11:undefinedmethod`get_uptime'forFacter::Util::Uptime:Module(NoMethodError)from/usr/lib/ruby

  5. ruby-openid:执行发现时未设置@socket - 2

    我在使用omniauth/openid时遇到了一些麻烦。在尝试进行身份验证时,我在日志中发现了这一点:OpenID::FetchingError:Errorfetchinghttps://www.google.com/accounts/o8/.well-known/host-meta?hd=profiles.google.com%2Fmy_username:undefinedmethod`io'fornil:NilClass重要的是undefinedmethodio'fornil:NilClass来自openid/fetchers.rb,在下面的代码片段中:moduleNetclass

  6. Ruby 方法() 方法 - 2

    我想了解Ruby方法methods()是如何工作的。我尝试使用“ruby方法”在Google上搜索,但这不是我需要的。我也看过ruby​​-doc.org,但我没有找到这种方法。你能详细解释一下它是如何工作的或者给我一个链接吗?更新我用methods()方法做了实验,得到了这样的结果:'labrat'代码classFirstdeffirst_instance_mymethodenddefself.first_class_mymethodendendclassSecond使用类#returnsavailablemethodslistforclassandancestorsputsSeco

  7. ruby - 如何以所有可能的方式将字符串拆分为长度最多为 3 的连续子字符串? - 2

    我试图获取一个长度在1到10之间的字符串,并输出将字符串分解为大小为1、2或3的连续子字符串的所有可能方式。例如:输入:123456将整数分割成单个字符,然后继续查找组合。该代码将返回以下所有数组。[1,2,3,4,5,6][12,3,4,5,6][1,23,4,5,6][1,2,34,5,6][1,2,3,45,6][1,2,3,4,56][12,34,5,6][12,3,45,6][12,3,4,56][1,23,45,6][1,2,34,56][1,23,4,56][12,34,56][123,4,5,6][1,234,5,6][1,2,345,6][1,2,3,456][123

  8. ruby-on-rails - Rails 3.2.1 中 ActionMailer 中的未定义方法 'default_content_type=' - 2

    我在我的项目中添加了一个系统来重置用户密码并通过电子邮件将密码发送给他,以防他忘记密码。昨天它运行良好(当我实现它时)。当我今天尝试启动服务器时,出现以下错误。=>BootingWEBrick=>Rails3.2.1applicationstartingindevelopmentonhttp://0.0.0.0:3000=>Callwith-dtodetach=>Ctrl-CtoshutdownserverExiting/Users/vinayshenoy/.rvm/gems/ruby-1.9.3-p0/gems/actionmailer-3.2.1/lib/action_mailer

  9. ruby - Highline 询问方法不会使用同一行 - 2

    设置:狂欢ruby1.9.2高线(1.6.13)描述:我已经相当习惯在其他一些项目中使用highline,但已经有几个月没有使用它了。现在,在Ruby1.9.2上全新安装时,它似乎不允许在同一行回答提示。所以以前我会看到类似的东西:require"highline/import"ask"Whatisyourfavoritecolor?"并得到:Whatisyourfavoritecolor?|现在我看到类似的东西:Whatisyourfavoritecolor?|竖线(|)符号是我的终端光标。知道为什么会发生这种变化吗? 最佳答案

  10. ruby - 主要 :Object when running build from sublime 的未定义方法 `require_relative' - 2

    我已经从我的命令行中获得了一切,所以我可以运行rubymyfile并且它可以正常工作。但是当我尝试从sublime中运行它时,我得到了undefinedmethod`require_relative'formain:Object有人知道我的sublime设置中缺少什么吗?我正在使用OSX并安装了rvm。 最佳答案 或者,您可以只使用“require”,它应该可以正常工作。我认为“require_relative”仅适用于ruby​​1.9+ 关于ruby-主要:Objectwhenrun

随机推荐