草庐IT

并行机

全部标签

java - StreamEx.parallel().forEach() 在 .map() 之后不并行运行

我注意到,如果我使用StreamEx库通过自定义ForkJoinPool并行处理我的流,如下所示-后续操作会在该池的并行线程中运行。但是,如果我添加一个map()操作并并行生成流-仅使用池中的一个线程。下面是演示此问题的最小工作示例的完整代码(没有所有导入)。executeAsParallelFromList()和executeAsParallelAfterMap()方法之间的唯一区别是在.parallel()之前添加了.map(...)调用。importone.util.streamex.StreamEx;publicclassParallelExample{privatestati

java - Rabbit Mq java客户端并行消费

我想并行处理来自rabbitMq队列的消息。队列配置为autoAck=false。我正在使用camel-rabbitMQ支持camelendpoints,它支持threadPoolSize参数,但这没有达到预期的效果。即使在threadpoolsize=20时,消息仍会在队列外按顺序处理。通过代码调试,我可以看到threadpoolsize参数用于创建一个ExecutorService,该ExecutorService用于传递给rabbitconnectionfactory,如所述here.在您进入兔子ConsumerWorkService之前,这一切看起来都不错。这里的消息在最大大小

Java lambda、无状态 lambda 和并行执行

在尝试学习Javalambda时,我看到了一篇文章(在下面列出),其中在关于流API的局限性的部分中,他指出:“有状态的lambda在顺序执行时通常不是问题,但是当流执行是并行的,它会中断”。然后,他将这段代码作为执行顺序问题的示例:Listss=...;Listresult=...;Streamstream=ss.stream();stream.map(s->{synchronized(result){if(result.size(){});我可以看出如果它是并行化的,这将如何是不确定的,但我看不到的是你将如何使用无状态lambda来解决这个问题——将东西添加到一个以并行方式列出。一

java - 等待并行 RX 订阅者完成

我正在寻找在rx-java中等待异步任务完成的最佳方法。作为一个常见的例子,有一个函数从本地商店获取ID列表,然后查询远程系统以获取这些ID,然后将远程系统结果合并到一个报告中并返回给调用者功能。由于对远程系统的调用很慢,我们希望它们以异步方式完成,我只想在所有调用都已返回且结果已处理后返回。我发现执行此操作的唯一可靠方法是轮询订阅以检查它是否已取消订阅。但我认为这似乎不是做事的“RX”方式!作为示例,我使用了http://howrobotswork.wordpress.com/2013/10/28/using-rxjava-in-android/中的示例并对其进行了轻微修改,使其成为

java - 并行化 : What causes Java threads to block other than synchronization & I/O?

简短版本在标题中。长版:我正在研究一个使用Java进行科学优化的程序。程序的工作负载可以分为并行和串行阶段——并行阶段意味着正在执行高度并行化的工作。为了加速程序(它运行数小时/数天),我创建了多个线程,这些线程的数量等于我正在使用的机器上的CPU核心数量——通常是4或8个——并在它们之间分配工作。然后我启动这些线程并加入()它们,然后再进入串行阶段。到目前为止一切顺利。困扰我的是并行阶段的CPU利用率和加速比“理论最大值”还差得很远——例如如果我有4个内核,我希望看到350-400%的“利用率”(如top所报告),但它在180到310之间反弹。仅使用一个线程,我获得100%的CPU利

java - ConcurrentHashMap 并行度阈值

ConcurrentHashMap有几个新方法。我有两个关于他们的问题:为什么不在ConcurrentMap中声明它们?parallelismThreshold的具体含义或作用是什么? 最佳答案 这些新方法似乎依赖于特定于ConcurrentHashMap的实现细节,但您必须从Java8作者那里得到答案才能确定。(他们确实浏览SO)来自ConcurrentHashMap的Javadoc:ThesebulkoperationsacceptaparallelismThresholdargument.Methodsproceedseque

java - 为什么我应该在带有收集的并行流中使用并发特性?

为什么我应该在带有收集的并行流中使用并发特性:Listlist=Collections.synchronizedList(newArrayList(Arrays.asList(1,2,4)));Mapcollect=list.stream().parallel().collect(Collectors.toConcurrentMap(k->k,v->v,(c,c2)->c+c2));而不是:Mapcollect=list.stream().parallel().collect(Collectors.toMap(k->k,v->v,(c,c2)->c+c2));换句话说,不使用这个特性有

java - 如何获得 Files.walk 的并行流?

我需要递归地对文件夹中的所有文件进行一些只读处理。我正在使用Files.walk获取文件流,但我注意到api指定walk仅返回常规流,而不是并行流。如何并行处理目录中的所有文件? 最佳答案 您可以通过调用Stream::parallel将任何Stream转换为并行Stream。Streamstream=Files.walk(startPath).parallel().forEach(...); 关于java-如何获得Files.walk的并行流?,我们在StackOverflow上找到一

GPU并行效率问题——通过MPS提升GPU计算收益

现象描述使用V100_32G型号的GPU运行计算程序时,发现程序每5秒能够完成一次任务,耗费显存6G。鉴于V100GPU拥有32G的显存,还有很多空闲,决定同时运行多个计算程序,来提升GPU计算收益。然而,这一切都是想当然的。运行多个计算程序时,每个计算程序的处理耗时大大增加。例如,同时运行4个计算程序,则这些计算程序差不多需要20秒才能完成一次任务,几乎是单进程运行时的4倍,算上并行的收益,20秒能够处理4个任务,这和单进程的计算程序的运行效果几乎没有区别,也就是说,多进程并行和单进程运行完全没有效率的提升。单进程:5秒/任务4进程:20秒/任务问题原因一种可能的解释是,当前的计算程序对GP

java - Gradle:优化并行运行的测试

我正在试验Gradle并行运行测试的功能。我发现的主要设置是Test的maxParallelForks属性任务。我预计该设置的行为类似于Executors.newFixedThreadPool执行测试。也就是说,固定数量的线程(在Gradle的情况下是进程)正在并发执行;每当一个线程完成工作时,就会在池中激活一个新线程。但是,Gradle的行为以不太理想的方式根本不同。看起来Gradle将测试类分成数量等于maxParallelForks的组,然后Gradle为每个组生成一个进程并让这些进程并行执行。这种策略的问题很明显:它不能根据测试类所需的时间动态调整执行。例如,假设您有5个类,m