草庐IT

java - 嵌套的 Java 8 并行 forEach 循环执行不佳。这种行为是预期的吗?

coder 2023-05-19 原文

注意:我已经在另一篇 SO 帖子中解决了这个问题 - Using a semaphore inside a nested Java 8 parallel stream action may DEADLOCK. Is this a bug? - 但这篇文章的标题暗示问题与信号量的使用有关 - 这有点分散了讨论的注意力。我创建这个是为了强调嵌套循环可能存在性能问题——尽管这两个问题可能有一个共同的原因(也许是因为我花了很多时间来解决这个问题)。 (我不认为它是重复的,因为它强调了另一种症状 - 但如果你确实删除它)。

问题:如果嵌套两个Java 8 stream.parallel().forEach 循环并且所有任务都是独立的、无状态的等等——除了提交到公共(public)FJ池——那么嵌套并行循环内的并行循环的性能比将顺序循环嵌套在并行循环内要差得多。更糟糕的是:如果包含内部循环的操作是同步的,你会得到一个死锁。

性能问题演示

没有“同步”,您仍然可以观察到性能问题。您可以在以下位置找到演示代码:http://svn.finmath.net/finmath%20experiments/trunk/src/net/finmath/experiments/concurrency/NestedParallelForEachTest.java (有关更详细的说明,请参阅那里的 JavaDoc)。

我们这里的设置如下:我们有一个嵌套的 stream.parallel().forEach()。

  • 内部循环是独立的(无状态、无干扰等 - 使用公共(public)池除外)并且在最坏的情况下总共消耗 1 秒,即如果按顺序处理。
  • 外循环的一半任务在该循环前 10 秒消耗。
  • 在该循环后 10 秒消耗一半。
  • 因此每个线程总共消耗 11 秒(最坏情况)。 * 我们有一个 boolean 值,它允许将内部循环从并行()切换到顺序()。

现在:将 24 个外循环任务提交到并行度为 8 的池中,我们预计最多 24/8 * 11 = 33 秒(在 8 核或更好的机器上)。

结果是:

  • 使用内部顺序循环:33 秒。
  • 使用内部并行循环:>80 秒(我有 92 秒)。

问题:您能确认一下这种行为吗?这是人们对框架的期望吗? (我现在更小心了,声称这是一个错误,但我个人认为这是由于 ForkJoinTask 的实现中的一个错误。备注:我已将此发布到并发兴趣(参见 http://cs.oswego.edu/pipermail/concurrency-interest/2014-May/012652.html) ,但到目前为止我还没有从那里得到确认)。

僵局演示

下面的代码会死锁

    // Outer loop
    IntStream.range(0,numberOfTasksInOuterLoop).parallel().forEach(i -> {
        doWork();
        synchronized(this) {
            // Inner loop
            IntStream.range(0,numberOfTasksInInnerLoop).parallel().forEach(j -> {
                doWork();
            });
        }
    });

其中 numberOfTasksInOuterLoop = 24, numberOfTasksInInnerLoop = 240, outerLoopOverheadFactor = 10000doWork 是一些无状态的 CPU 刻录机.

您可以在 http://svn.finmath.net/finmath%20experiments/trunk/src/net/finmath/experiments/concurrency/NestedParallelForEachAndSynchronization.java 找到完整的演示代码 (有关更详细的说明,请参阅那里的 JavaDoc)。

这是预期的行为吗?请注意,有关 Java 并行流的文档没有提到任何嵌套或同步问题。此外,没有提到两者都使用共同的 fork 连接池这一事实。

更新

另一个关于性能问题的测试可以在 http://svn.finmath.net/finmath%20experiments/trunk/src/net/finmath/experiments/concurrency/NestedParallelForEachBenchmark.java 找到。 - 这个测试没有任何阻塞操作(没有 Thread.sleep 和不同步)。我在这里整理了一些评论:http://christian-fries.de/blog/files/2014-nested-java-8-parallel-foreach.html

更新 2

似乎这个问题和更严重的信号量死锁已经在 J​​ava8 u40 中得到修复。

最佳答案

问题是你配置的相当有限的并行度被外部流处理吃掉了:如果你说你想要八个线程并使用 parallel() 处理超过八个项目的流> 它将创建八个工作线程并让它们处理项目。

然后在您的消费者中,您正在使用 parallel() 处理另一个流,但没有剩余的工作线程。由于工作线程在等待内部流处理结束时被阻塞,因此 ForkJoinPool 必须创建违反您配置的并行度的新工作线程。在我看来,它不会回收这些扩展线程,而是让它们在处理后立即死亡。因此,在您的内部处理中,会创建和处理新线程,这是一项昂贵的操作。

您可能会将其视为一个缺陷,即启动线程不参与并行流处理的计算,而只是等待结果,但即使已修复,您仍然会遇到一个很难解决的一般问题(如果有的话)修复:

每当工作线程与外部流项目的数量之间的比率较低时,实现会将它们全部用于外部流,因为它不知道流是外部流。因此,并行执行内部流请求的工作线程比可用的多。使用调用者线程参与计算可以修复它,使其性能等于串行计算,但在这里获得并行执行的优势不适用于固定数量的工作线程的概念。

请注意,您在这里只是触及了这个问题的表面,因为您对项目的处理时间相当平衡。如果内部项和外部项的处理都出现分歧(与同一级别的项相比),问题将更加严重。


更新:通过分析和查看代码,似乎 ForkJoinPool 确实 尝试使用等待线程进行“工作窃取”,但根据具体情况使用不同的代码Thread 是工作线程还是其他线程。结果,一个工作线程实际上大约有 80% 的时间在等待,并且几乎没有做任何工作,而其他线程确实对计算做出了贡献……


更新 2:为了完整起见,这里是注释中描述的简单并行执行方法。由于它将每个项目排入队列,因此当单个项目的执行时间相当短时,预计会有很多开销。所以这不是一个复杂的解决方案,而是一个演示,它可以在没有太多魔法的情况下处理长时间运行的任务……

import java.lang.reflect.UndeclaredThrowableException;
import java.util.concurrent.*;
import java.util.function.IntConsumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class NestedParallelForEachTest1 {
    static final boolean isInnerStreamParallel = true;

    // Setup: Inner loop task 0.01 sec in worse case. Outer loop task: 10 sec + inner loop. This setup: (100 * 0.01 sec + 10 sec) * 24/8 = 33 sec.
    static final int numberOfTasksInOuterLoop = 24;  // In real applications this can be a large number (e.g. > 1000).
    static final int numberOfTasksInInnerLoop = 100; // In real applications this can be a large number (e.g. > 1000).
    static final int concurrentExecutionsLimitForStreams = 8;

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        System.out.println(System.getProperty("java.version")+" "+System.getProperty("java.home"));
        new NestedParallelForEachTest1().testNestedLoops();
        E.shutdown();
    }

    final static ThreadPoolExecutor E = new ThreadPoolExecutor(
        concurrentExecutionsLimitForStreams, concurrentExecutionsLimitForStreams,
        2, TimeUnit.MINUTES, new SynchronousQueue<>(), (r,e)->r.run() );

    public static void parallelForEach(IntStream s, IntConsumer c) {
        s.mapToObj(i->E.submit(()->c.accept(i))).collect(Collectors.toList())
         .forEach(NestedParallelForEachTest1::waitOrHelp);
    }
    static void waitOrHelp(Future f) {
        while(!f.isDone()) {
            Runnable r=E.getQueue().poll();
            if(r!=null) r.run();
        }
        try { f.get(); }
        catch(InterruptedException ex) { throw new RuntimeException(ex); }
        catch(ExecutionException eex) {
            Throwable t=eex.getCause();
            if(t instanceof RuntimeException) throw (RuntimeException)t;
            if(t instanceof Error) throw (Error)t;
            throw new UndeclaredThrowableException(t);
        }
    }
    public void testNestedLoops(NestedParallelForEachTest1 this) {
        long start = System.nanoTime();
        // Outer loop
        parallelForEach(IntStream.range(0,numberOfTasksInOuterLoop), i -> {
            if(i < 10) sleep(10 * 1000);
            if(isInnerStreamParallel) {
                // Inner loop as parallel: worst case (sequential) it takes 10 * numberOfTasksInInnerLoop millis
                parallelForEach(IntStream.range(0,numberOfTasksInInnerLoop), j -> sleep(10));
            }
            else {
                // Inner loop as sequential
                IntStream.range(0,numberOfTasksInInnerLoop).sequential().forEach(j -> sleep(10));
            }
            if(i >= 10) sleep(10 * 1000);
        });
        long end = System.nanoTime();
        System.out.println("Done in "+TimeUnit.NANOSECONDS.toSeconds(end-start)+" sec.");
    }
    static void sleep(int milli) {
        try {
            Thread.sleep(milli);
        } catch (InterruptedException ex) {
            throw new AssertionError(ex);
        }
    }
}

关于java - 嵌套的 Java 8 并行 forEach 循环执行不佳。这种行为是预期的吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23489993/

有关java - 嵌套的 Java 8 并行 forEach 循环执行不佳。这种行为是预期的吗?的更多相关文章

  1. ruby-openid:执行发现时未设置@socket - 2

    我在使用omniauth/openid时遇到了一些麻烦。在尝试进行身份验证时,我在日志中发现了这一点:OpenID::FetchingError:Errorfetchinghttps://www.google.com/accounts/o8/.well-known/host-meta?hd=profiles.google.com%2Fmy_username:undefinedmethod`io'fornil:NilClass重要的是undefinedmethodio'fornil:NilClass来自openid/fetchers.rb,在下面的代码片段中:moduleNetclass

  2. ruby - 树顶语法无限循环 - 2

    我脑子里浮现出一些关于一种新编程语言的想法,所以我想我会尝试实现它。一位friend建议我尝试使用Treetop(Rubygem)来创建一个解析器。Treetop的文档很少,我以前从未做过这种事情。我的解析器表现得好像有一个无限循环,但没有堆栈跟踪;事实证明很难追踪到。有人可以指出入门级解析/AST指南的方向吗?我真的需要一些列出规则、常见用法等的东西来使用像Treetop这样的工具。我的语法分析器在GitHub上,以防有人希望帮助我改进它。class{initialize=lambda(name){receiver.name=name}greet=lambda{IO.puts("He

  3. ruby-on-rails - Rails 编辑表单不显示嵌套项 - 2

    我得到了一个包含嵌套链接的表单。编辑时链接字段为空的问题。这是我的表格:Editingkategori{:action=>'update',:id=>@konkurrancer.id})do|f|%>'Trackingurl',:style=>'width:500;'%>'Editkonkurrence'%>|我的konkurrencer模型:has_one:link我的链接模型:classLink我的konkurrancer编辑操作:defedit@konkurrancer=Konkurrancer.find(params[:id])@konkurrancer.link_attrib

  4. ruby-on-rails - 在 Ruby 中循环遍历多个数组 - 2

    我有多个ActiveRecord子类Item的实例数组,我需要根据最早的事件循环打印。在这种情况下,我需要打印付款和维护日期,如下所示:ItemAmaintenancerequiredin5daysItemBpaymentrequiredin6daysItemApaymentrequiredin7daysItemBmaintenancerequiredin8days我目前有两个查询,用于查找maintenance和payment项目(非排他性查询),并输出如下内容:paymentrequiredin...maintenancerequiredin...有什么方法可以改善上述(丑陋的)代

  5. ruby - 将散列转换为嵌套散列 - 2

    这道题是thisquestion的逆题.给定一个散列,每个键都有一个数组,例如{[:a,:b,:c]=>1,[:a,:b,:d]=>2,[:a,:e]=>3,[:f]=>4,}将其转换为嵌套哈希的最佳方法是什么{:a=>{:b=>{:c=>1,:d=>2},:e=>3,},:f=>4,} 最佳答案 这是一个迭代的解决方案,递归的解决方案留给读者作为练习:defconvert(h={})ret={}h.eachdo|k,v|node=retk[0..-2].each{|x|node[x]||={};node=node[x]}node[

  6. ruby - Chef 执行非顺序配方 - 2

    我遵循了教程http://gettingstartedwithchef.com/,第1章。我的运行list是"run_list":["recipe[apt]","recipe[phpap]"]我的phpapRecipe默认Recipeinclude_recipe"apache2"include_recipe"build-essential"include_recipe"openssl"include_recipe"mysql::client"include_recipe"mysql::server"include_recipe"php"include_recipe"php::modul

  7. java - 等价于 Java 中的 Ruby Hash - 2

    我真的很习惯使用Ruby编写以下代码:my_hash={}my_hash['test']=1Java中对应的数据结构是什么? 最佳答案 HashMapmap=newHashMap();map.put("test",1);我假设? 关于java-等价于Java中的RubyHash,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/22737685/

  8. ruby - RuntimeError(自动加载常量 Apps 多线程时检测到循环依赖 - 2

    我收到这个错误:RuntimeError(自动加载常量Apps时检测到循环依赖当我使用多线程时。下面是我的代码。为什么会这样?我尝试多线程的原因是因为我正在编写一个HTML抓取应用程序。对Nokogiri::HTML(open())的调用是一个同步阻塞调用,需要1秒才能返回,我有100,000多个页面要访问,所以我试图运行多个线程来解决这个问题。有更好的方法吗?classToolsController0)app.website=array.join(',')putsapp.websiteelseapp.website="NONE"endapp.saveapps=Apps.order("

  9. ruby - 为什么 Ruby 的 each 迭代器先执行? - 2

    我在用Ruby执行简单任务时遇到了一件奇怪的事情。我只想用每个方法迭代字母表,但迭代在执行中先进行:alfawit=("a".."z")puts"That'sanalphabet:\n\n#{alfawit.each{|litera|putslitera}}"这段代码的结果是:(缩写)abc⋮xyzThat'sanalphabet:a..z知道为什么它会这样工作或者我做错了什么吗?提前致谢。 最佳答案 因为您的each调用被插入到在固定字符串之前执行的字符串文字中。此外,each返回一个Enumerable,实际上您甚至打印它。试试

  10. Ruby——嵌套类和子类是一回事吗? - 2

    下面例子中的Nested和Child有什么区别?是否只是同一事物的不同语法?classParentclassNested...endendclassChild 最佳答案 不,它们是不同的。嵌套:Computer之外的“Processor”类只能作为Computer::Processor访问。嵌套为内部类(namespace)提供上下文。对于ruby​​解释器Computer和Computer::Processor只是两个独立的类。classComputerclassProcessor#Tocreateanobjectforthisc

随机推荐