草庐IT

java - 即时向 Java 8 并行流添加元素

coder 2024-03-14 原文

目标是在 Java 8 流的帮助下处理连续的元素流。因此,在处理该流时,将元素添加到并行流的数据源。

Javadoc of Streams在“非干扰”部分描述了以下属性:

For most data sources, preventing interference means ensuring that the data source is not modified at all during the execution of the stream pipeline. The notable exception to this are streams whose sources are concurrent collections, which are specifically designed to handle concurrent modification. Concurrent stream sources are those whose Spliterator reports the CONCURRENT characteristic.

这就是我们尝试使用 ConcurrentLinkedQueue 的原因,它返回 true

new ConcurrentLinkedQueue<Integer>().spliterator().hasCharacteristics(Spliterator.CONCURRENT)

没有明确说明,在并行流中使用时不得修改数据源。

在我们的示例中,对于流中的每个元素,递增的计数器值被添加到队列中,队列是流的数据源,直到计数器大于 N。 通过调用 queue.stream() 在顺序执行时一切正常:

import static org.junit.Assert.assertEquals;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;

public class StreamTest {
    public static void main(String[] args) {
        final int N = 10000;
        assertEquals(N, testSequential(N));
    }

    public static int testSequential(int N) {
        final AtomicInteger counter = new AtomicInteger(0);
        final AtomicInteger check = new AtomicInteger(0);
        final Queue<Integer> queue = new ConcurrentLinkedQueue<Integer>();

        for (int i = 0; i < N / 10; ++i) {
            queue.add(counter.incrementAndGet());
        }

        Stream<Integer> stream = queue.stream();
        stream.forEach(i -> {
            System.out.println(i);

            int j = counter.incrementAndGet();

            check.incrementAndGet();
            if (j <= N) {
                queue.add(j);
            }
        });
        stream.close();
        return check.get();
    }
}

作为第二次尝试,流是并行的并抛出 java.lang.AssertionError,因为检查小于 N 并且不是队列中的每个元素都被处理。流可能提前完成执行,因为队列可能在某个时间点变空了。

import static org.junit.Assert.assertEquals;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;

public class StreamTest {
    public static void main(String[] args) {
        final int N = 10000;
        assertEquals(N, testParallel1(N));
    }

    public static int testParallel1(int N) {
        final AtomicInteger counter = new AtomicInteger(0);
        final AtomicInteger check = new AtomicInteger(0);
        final Queue<Integer> queue = new ConcurrentLinkedQueue<Integer>();

        for (int i = 0; i < N / 10; ++i) {
            queue.add(counter.incrementAndGet());
        }

        Stream<Integer> stream = queue.parallelStream();
        stream.forEach(i -> {
            System.out.println(i);

            int j = counter.incrementAndGet();

            check.incrementAndGet();
            if (j <= N) {
                queue.add(j);
            }
        });
        stream.close();
        return check.get();
    }
}

下一次尝试是在连续流“真正”结束(队列为空)后向主线程发出信号,然后关闭流对象。这里的问题是,流对象似乎只从队列中读取元素一次,或者至少不是连续读取元素,并且永远不会到达流的“真正”末端。

import static org.junit.Assert.assertEquals;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Stream;

public class StreamTest {

    public static void main(String[] args) {
        final int N = 10000;
        try {
            assertEquals(N, testParallel2(N));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static int testParallel2(int N) throws InterruptedException {
        final Lock lock = new ReentrantLock();
        final Condition cond = lock.newCondition();

        final AtomicInteger counter = new AtomicInteger(0);
        final AtomicInteger check = new AtomicInteger(0);
        final Queue<Integer> queue = new ConcurrentLinkedQueue<Integer>();

        for (int i = 0; i < N / 10; ++i) {
            queue.add(counter.incrementAndGet());
        }

        Stream<Integer> stream = queue.parallelStream();
        stream.forEach(i -> {
            System.out.println(i);

            int j = counter.incrementAndGet();

            lock.lock();
            check.incrementAndGet();
            if (j <= N) {
                queue.add(j);
            } else {
                cond.signal();
            }
            lock.unlock();
        });

        lock.lock();
        while (check.get() < N) {
            cond.await();
        }
        lock.unlock();
        stream.close();
        return check.get();
    }
}

由此产生的问题是:

  • 我们做错了什么吗?
  • 是否是 Stream API 的未指定甚至错误使用?
  • 否则我们如何才能实现预期的行为?

最佳答案

“修改 Stream 的源代码不会破坏它”与您的假设“修改将反射(reflect)在正在进行的 Stream 操作中”之间存在显着差异。

CONCURRENT 属性意味着源的修改是允许的,即它永远不会抛出 ConcurrentModificationException,但它不会暗示您可以依赖特定行为来判断是否反射(reflect)了这些更改。

documentation of the CONCURRENT flag本身说:

Most concurrent collections maintain a consistency policy guaranteeing accuracy with respect to elements present at the point of Spliterator construction, but possibly not reflecting subsequent additions or removals.

此 Stream 行为与已知的 ConcurrentLinkedQueue 行为一致:

Iterators are weakly consistent, returning elements reflecting the state of the queue at some point at or since the creation of the iterator. They do not throw ConcurrentModificationException, and may proceed concurrently with other operations. Elements contained in the queue since the creation of the iterator will be returned exactly once.

很难说如何“以其他方式实现所需的行为”,因为您没有以代码以外的任何形式描述“所需的行为”,代码可以简单地替换为

public static int testSequential(int N) {
    return N;
}
public static int testParallel1(int N) {
    return N;
}

因为这是唯一可观察到的效果……考虑 redefining your problem ……

关于java - 即时向 Java 8 并行流添加元素,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40085379/

有关java - 即时向 Java 8 并行流添加元素的更多相关文章

  1. ruby - 我需要将 Bundler 本身添加到 Gemfile 中吗? - 2

    当我使用Bundler时,是否需要在我的Gemfile中将其列为依赖项?毕竟,我的代码中有些地方需要它。例如,当我进行Bundler设置时:require"bundler/setup" 最佳答案 没有。您可以尝试,但首先您必须用鞋带将自己抬离地面。 关于ruby-我需要将Bundler本身添加到Gemfile中吗?,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/4758609/

  2. ruby - 将 Bootstrap Less 添加到 Sinatra - 2

    我有一个ModularSinatra应用程序,我正在尝试将Bootstrap添加到应用程序中。get'/bootstrap/application.css'doless:"bootstrap/bootstrap"end我在views/bootstrap中有所有less文件,包括bootstrap.less。我收到这个错误:Less::ParseErrorat/bootstrap/application.css'reset.less'wasn'tfound.Bootstrap.less的第一行是://CSSReset@import"reset.less";我尝试了所有不同的路径格式,但它

  3. ruby - 续集在添加关联时访问many_to_many连接表 - 2

    我正在使用Sequel构建一个愿望list系统。我有一个wishlists和itemstable和一个items_wishlists连接表(该名称是续集选择的名称)。items_wishlists表还有一个用于facebookid的额外列(因此我可以存储opengraph操作),这是一个NOTNULL列。我还有Wishlist和Item具有续集many_to_many关联的模型已建立。Wishlist类也有:selectmany_to_many关联的选项设置为select:[:items.*,:items_wishlists__facebook_action_id].有没有一种方法可以

  4. java - 等价于 Java 中的 Ruby Hash - 2

    我真的很习惯使用Ruby编写以下代码:my_hash={}my_hash['test']=1Java中对应的数据结构是什么? 最佳答案 HashMapmap=newHashMap();map.put("test",1);我假设? 关于java-等价于Java中的RubyHash,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/22737685/

  5. ruby - 即时确定方法的可见性 - 2

    我正在编写一个方法,它将在一个类中定义一个实例方法;类似于attr_accessor:classFoocustom_method(:foo)end我通过将custom_method函数添加到Module模块并使用define_method定义方法来实现它,效果很好。但我无法弄清楚如何考虑类(class)的可见性属性。例如,在下面的类中classFoocustom_method(:foo)privatecustom_method(:bar)end第一个生成的方法(foo)必须是公共(public)的,第二个(bar)必须是私有(private)的。我怎么做?或者,如何找到调用我的cust

  6. java - 从 JRuby 调用 Java 类的问题 - 2

    我正在尝试使用boilerpipe来自JRuby。我看过guide从JRuby调用Java,并成功地将它与另一个Java包一起使用,但无法弄清楚为什么同样的东西不能用于boilerpipe。我正在尝试基本上从JRuby中执行与此Java等效的操作:URLurl=newURL("http://www.example.com/some-location/index.html");Stringtext=ArticleExtractor.INSTANCE.getText(url);在JRuby中试过这个:require'java'url=java.net.URL.new("http://www

  7. ruby - 可以通过多少种方法将方法添加到 ruby​​ 对象? - 2

    当谈到运行时自省(introspection)和动态代码生成时,我认为ruby​​没有任何竞争对手,可能除了一些lisp方言。前几天,我正在做一些代码练习来探索ruby​​的动态功能,我开始想知道如何向现有对象添加方法。以下是我能想到的3种方法:obj=Object.new#addamethoddirectlydefobj.new_method...end#addamethodindirectlywiththesingletonclassclass这只是冰山一角,因为我还没有探索instance_eval、module_eval和define_method的各种组合。是否有在线/离线资

  8. ruby - 如何在 Ruby 中向现有方法定义添加语句 - 2

    我注意到类定义,如果我打开classMyClass,并在不覆盖的情况下添加一些东西我仍然得到了之前定义的原始方法。添加的新语句扩充了现有语句。但是对于方法定义,我仍然想要与类定义相同的行为,但是当我打开defmy_method时似乎,def中的现有语句和end被覆盖了,我需要重写一遍。那么有什么方法可以使方法定义的行为与定义相同,类似于super,但不一定是子类? 最佳答案 我想您正在寻找alias_method:classAalias_method:old_func,:funcdeffuncold_func#similartoca

  9. ruby-on-rails - 添加回形针新样式不影响旧上传的图像 - 2

    我有带有Logo图像的公司模型has_attached_file:logo我用他们的Logo创建了许多公司。现在,我需要添加新样式has_attached_file:logo,:styles=>{:small=>"30x15>",:medium=>"155x85>"}我是否应该重新上传所有旧数据以重新生成新样式?我不这么认为……或者有什么rake任务可以重新生成样式吗? 最佳答案 参见Thumbnail-Generation.如果rake任务不适合你,你应该能够在控制台中使用一个片段来调用重新处理!关于相关公司

  10. ruby - 在哈希的键数组中追加元素 - 2

    查看我的Ruby代码:h=Hash.new([])h[0]=:word1h[1]=h[1]输出是:Hash={0=>:word1,1=>[:word2,:word3],2=>[:word2,:word3]}我希望有Hash={0=>:word1,1=>[:word2],2=>[:word3]}为什么要附加第二个哈希元素(数组)?如何将新数组元素附加到第三个哈希元素? 最佳答案 如果您提供单个值作为Hash.new的参数(例如Hash.new([]),完全相同的对象将用作每个缺失键的默认值。这就是您所拥有的,那是你不想要的。您可以改用

随机推荐