草庐IT

Java8中的Stream流

Mr_姚 2023-09-21 原文

定义

什么是Stream流,Java doc中是这样写的

A sequence of elements supporting sequential and parallel aggregate operations

翻译一下就是一个支持顺序和并行聚合操作的元素序列。
可以把它理解成一个迭代器,但是只能遍历一次,就像是流水一样,要处理的元素在流中传输,并且可以在流中设置多个处理节点,元素在经过每个节点后会被节点的逻辑所处理。比如可以进行过滤、排序、转换等操作。

Stream流的使用可以分为三个步骤:

  • 数据源,创建流
  • 中间操作,可以有多个,生成一个新的流
  • 终端操作,只能有一个,放在最后,代表流中止。

Stream流有几个特点:
1、Stream流一般不会改变数据源,只会生成一个新的数据流。
2、Stream流不会存储数据,只会根据设置的操作节点处理数据。
3、Stream流是延迟执行的,只有在调用终端操作后才会进行流转。

看一下Stream的结构


image.png

使用

数据源生成流

  • 如果是集合的话,可以直接使用stream()创建流。
  • 如果是数组的话,可以使用Arrays.stream()Stream.of()来创建流。
// 集合生成流
List<String> strList = new ArrayList<>();
Stream<String> stream = strList.stream();

//数据生成流
String[] strs = new String[]{"1","2","3"};
Stream<String> stream1 = Arrays.stream(strs);
Stream<String> stream2 = Stream.of(strs);

中间操作

在上边Stream定义中,返回是Stream类型的大多数都是中间操作,入参大多数都是函数式编程,不熟悉的可以看看这篇<Java函数式编程>。常用的中间操作有

  • 过滤操作 filter()
Arrays.stream(strs).filter(s -> s.equals("1"));
  • 排序操作 sorted()
Arrays.stream(strs).sorted();
  • 去重操作 distinct()
Arrays.stream(strs).distinct();
  • 映射操作,将流中元素转换成新的元素
    • mapToInt()转换成Integer类型
    • mapToLong()转换成Long类型
    • mapToDouble()转换成Double类型
    • map() 自定义转换类型,这是一个使用频率非常高的方法。
//将字符串转换成Integer
Arrays.stream(strs).mapToInt(s -> Integer.valueOf(s));
//将字符串转换成Long
Arrays.stream(strs).mapToLong(s -> Long.valueOf(s));
//将字符串转换成Doublde
Arrays.stream(strs).mapToDouble(s -> Double.valueOf(s));
//自定义转换的类型
Arrays.stream(strs).map(s -> new BigDecimal(s));

中间操作是可以有多个的,我们可以根据业务功能组合多个中间操作,比如求数组中字符串包含s的字符串长度排序

Arrays.stream(strs).filter(e->e.contains("s")).map(String::length).sorted();

终端操作

终端操作,表示结束流操作,是在流的最后,常用的有

  • 统计 count()
long count = Arrays.stream(strs).count();
// count=3
  • 获取最小值 min()
// 将字符串转换成Interger类型再比较大小
 OptionalInt min = Arrays.stream(strs).mapToInt(Integer::valueOf).min();
 System.out.println(min.getAsInt());
 // 1
  • 获取最大值 max()
 OptionalInt max = Arrays.stream(strs).mapToInt(Integer::valueOf).max();
 System.out.println(max.getAsInt());
 // 3
  • 匹配
    • anyMatch(),只要有一个匹配就返回true
    • allMatch(),只有全部匹配才返回true
    • noneMatch(),只要有一个匹配就返回 false
boolean all = Arrays.stream(strs).allMatch(s -> s.equals("2"));
boolean any = Arrays.stream(strs).anyMatch(s -> s.equals("2"));
boolean none = Arrays.stream(strs).noneMatch(s -> s.equals("2"));
// all = false
// any = true
// none = false
  • 组合 reduce()将Stream 中的元素组合起来,有两种用法
    • Optional reduce(BinaryOperator accumulator) 没有起始值只有运算规则
    • T reduce(T identity, BinaryOperator accumulator),有运算起始值和运算规则、返回的是和起始值一样的类型
Integer[] integers = new Integer[]{1,2,3};
Optional<Integer> reduce1 = Arrays.stream(integers).reduce((i1, i2) -> i1 + i2);
Integer reduce2 = Arrays.stream(integers).reduce(100, (i1, i2) -> i1 + i2);
// reduce1.get() = 6
// reduce2 = 106
  • 转换 collect(),转换作用是将流再转换成集合或数组,这也是一个使用频率非常高的方法。
    collect()一般配合Collectors使用,Collectors 是一个收集器的工具类,内置了一系列收集器实现,比如toList() 转换成list集合,toMap()转换成Map,toSet()转换成Set集合,joining() 将元素收集到一个可以用分隔符指定的字符串中。
String[] strs = new String[]{"11111", "222", "3"};
//统计每个字符串的长度
List<Integer> lengths = Arrays.stream(strs).map(String::length).collect(Collectors.toList());
String s = Arrays.stream(strs).collect(Collectors.joining(","));
// lengths=[5,3,1]
// s = 11111,222,3

合理的组合Steam操作,可以很大的提升生产力

原理

Stream的实现类中,将Stream划分成了HeadStatelessOpStatefulOpHead控制数据流入,中间操作分为了StatelessOpStatefulOp

StatelessOp代表无状态操作:每个数据的处理是独立的,不会影响或依赖之前的数据。像filter()map()等。

StatefulOp代表有状态操作::处理时会记录状态,比如后面元素的处理会依赖前面记录的状态,或者拿到所有元素才能继续下去等这样有状态的操作,像sorted()

现在已下面代码为例,分析一下Stream的原理

 list.stream()
     .filter(e -> e.length() > 1)
     .sorted()
     .filter(e -> e.equals("333"))
     .collect(Collectors.toList());

数据源生成流

首先,进入到list.stream()

//Collection#stream

 default Stream<E> stream() {
    return StreamSupport.stream(spliterator(), false);
 }

default Spliterator<E> spliterator() {
    return Spliterators.spliterator(this, 0);
 }

//StreamSupport#stream
public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
    Objects.requireNonNull(spliterator);
    return new ReferencePipeline.Head<>(spliterator,
                                        StreamOpFlag.fromCharacteristics(spliterator),
                                        parallel);
}

将原数据封装成Spliterator,同时生成一个Head,将Spliterator放到Head中。

中间操作

接着分析中间操作.filter(e -> e.length() > 1)的代码

//ReferencePipeline#filter
public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
    Objects.requireNonNull(predicate);
    return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
                                  StreamOpFlag.NOT_SIZED) {
        @Override
        Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
            return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
                @Override
                public void begin(long size) {
                    downstream.begin(-1);
                }

                @Override
                public void accept(P_OUT u) {
                    if (predicate.test(u))
                        downstream.accept(u);
                }
            };
        }
    };
}

返回的是一个无状态操作StatelessOp,查看StatelessOp的构造函数

// AbstractPipeline#AbstractPipeline
  AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
      if (previousStage.linkedOrConsumed)
          throw new IllegalStateException(MSG_STREAM_LINKED);
      previousStage.linkedOrConsumed = true;
      previousStage.nextStage = this;

      this.previousStage = previousStage;
      this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
      this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
      this.sourceStage = previousStage.sourceStage;
      if (opIsStateful())
          sourceStage.sourceAnyStateful = true;
      this.depth = previousStage.depth + 1;
  }

构造函数中有previousStage.nextStage = this;this.previousStage = previousStage;,相当于将当前的StatelessOp操作拼接到Head后面,构成了一条双向链表。

再看后面的.sorted().filter(e -> e.equals("333")).limit(10),也会将操作添加到了双向链表后面。.sorted()在链表后面添加的是StatefulOp有状态操作。

终端操作

最后走到终端操作.collect(Collectors.toList())。进入到collect()

//ReferencePipeline#collect
public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) {
    A container;
    if (isParallel()
            && (collector.characteristics().contains(Collector.Characteristics.CONCURRENT))
            && (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) {
        container = collector.supplier().get();
        BiConsumer<A, ? super P_OUT> accumulator = collector.accumulator();
        forEach(u -> accumulator.accept(container, u));
    }
    else {
        container = evaluate(ReduceOps.makeRef(collector));
    }
    return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)
            ? (R) container
            : collector.finisher().apply(container);
}

并发操作先不看,直接看container = evaluate(ReduceOps.makeRef(collector));ReduceOps.makeRef()返回是TerminalOp,代表的是终端操作。

[图片上传失败...(image-40689-1677743203179)]

evaluate()

//AbstractPipeline#evaluate
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
  assert getOutputShape() == terminalOp.inputShape();
    if (linkedOrConsumed)
        throw new IllegalStateException(MSG_STREAM_LINKED);
    linkedOrConsumed = true;

    return isParallel()
            ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
            : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
    }

先不管并行,进串行入evaluateSequential()

//ReduceOps#evaluateSequential
public <P_IN> R evaluateSequential(PipelineHelper<T> helper,
                                    Spliterator<P_IN> spliterator) {
    return helper.wrapAndCopyInto(makeSink(), spliterator).get();
}

makeSink()将返回一个Sink实例,并作为参数和 spliterator 一起传入最后一个节点(terminalOp)的 wrapAndCopyInto() 方法

//AbstractPipeline#wrapAndCopyInto
final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
    copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
    return sink;
}

final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
     Objects.requireNonNull(sink);

    for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
        sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
    }
    return (Sink<P_IN>) sink;
}

wrapSink()将最后一个节点创建的 Sink 传入,并且看到里面有个 for 循环。这个 for 循环是从最后一个节点开始,到第二个节点结束。每一次循环都是将上一节点的 combinedFlags 和当前的 Sink 包起来生成一个新的 Sink 。这和前面拼接各个操作很类似,只不过拼接的是 Sink 的实现类的实例,方向相反。

image.png

到现在整个流水已经拼接完成。真正的数据处理在copyInto()中。

//AbstractPipeline#copyInto
final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
    Objects.requireNonNull(wrappedSink);

    if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
        wrappedSink.begin(spliterator.getExactSizeIfKnown());
        spliterator.forEachRemaining(wrappedSink);
        wrappedSink.end();
    }
    else {
        copyIntoWithCancel(wrappedSink, spliterator);
    }
}

Sink中有三个方法:

  • begin:节点开始准备
  • accept: 节点处理数据
  • end: 节点处理结束

Sink与操作是相关的,不同的Sink有不同的职责,无状态操作的 Sink 接收到通知或者数据,处理完了会马上通知自己的下游。有状态操作的 Sink 则像有一个缓冲区一样,它会等要处理的数据处理完了才开始通知下游,并将自己处理的结果传递给下游。

比如filter这种无状态的操作,处理完数据会直接交给下游,而像sorted这种无有状态的操作在begin阶段会先创建一个容器,accept会将流转过来的数据保存起来,最后在执行 end方法时才正在开始排序。排序之后再将数据,采用同样的方式依次传递给下游节点。

wrapAndCopyInto() 返回了 TerminalOps 创建的 Sink,这时候它里面已经包含了最终处理的结果。调用它的 get() 方法就获得了最终的结果。

Steam还可以支持并行流,把list.stream()换成list.parallelStream()即可使用并行操作。

并行过程中,构建操作链的双向链表是不变的,区别实在构建完后的操作

//AbstractPipeline#evaluate
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
  assert getOutputShape() == terminalOp.inputShape();
    if (linkedOrConsumed)
        throw new IllegalStateException(MSG_STREAM_LINKED);
    linkedOrConsumed = true;

    return isParallel()
            ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
            : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
    }

这次进入到 evaluateParallel()

//ReduceOps#evaluateSequential
public <P_IN> R evaluateParallel(PipelineHelper<T> helper,
                                         Spliterator<P_IN> spliterator) {
    return new ReduceTask<>(this, helper, spliterator).invoke().get();
}

ReduceTask继承自ForkJoinTaskSteam的并行底层用的是ForkJoin框架。

有关Java8中的Stream流的更多相关文章

  1. ruby - 如何从 ruby​​ 中的字符串运行任意对象方法? - 2

    总的来说,我对ruby​​还比较陌生,我正在为我正在创建的对象编写一些rspec测试用例。许多测试用例都非常基础,我只是想确保正确填充和返回值。我想知道是否有办法使用循环结构来执行此操作。不必为我要测试的每个方法都设置一个assertEquals。例如:describeitem,"TestingtheItem"doit"willhaveanullvaluetostart"doitem=Item.new#HereIcoulddotheitem.name.shouldbe_nil#thenIcoulddoitem.category.shouldbe_nilendend但我想要一些方法来使用

  2. ruby - 其他文件中的 Rake 任务 - 2

    我试图在一个项目中使用rake,如果我把所有东西都放到Rakefile中,它会很大并且很难读取/找到东西,所以我试着将每个命名空间放在lib/rake中它自己的文件中,我添加了这个到我的rake文件的顶部:Dir['#{File.dirname(__FILE__)}/lib/rake/*.rake'].map{|f|requiref}它加载文件没问题,但没有任务。我现在只有一个.rake文件作为测试,名为“servers.rake”,它看起来像这样:namespace:serverdotask:testdoputs"test"endend所以当我运行rakeserver:testid时

  3. ruby-on-rails - Ruby net/ldap 模块中的内存泄漏 - 2

    作为我的Rails应用程序的一部分,我编写了一个小导入程序,它从我们的LDAP系统中吸取数据并将其塞入一个用户表中。不幸的是,与LDAP相关的代码在遍历我们的32K用户时泄漏了大量内存,我一直无法弄清楚如何解决这个问题。这个问题似乎在某种程度上与LDAP库有关,因为当我删除对LDAP内容的调用时,内存使用情况会很好地稳定下来。此外,不断增加的对象是Net::BER::BerIdentifiedString和Net::BER::BerIdentifiedArray,它们都是LDAP库的一部分。当我运行导入时,内存使用量最终达到超过1GB的峰值。如果问题存在,我需要找到一些方法来更正我的代

  4. ruby-on-rails - Rails 3 中的多个路由文件 - 2

    Rails2.3可以选择随时使用RouteSet#add_configuration_file添加更多路由。是否可以在Rails3项目中做同样的事情? 最佳答案 在config/application.rb中:config.paths.config.routes在Rails3.2(也可能是Rails3.1)中,使用:config.paths["config/routes"] 关于ruby-on-rails-Rails3中的多个路由文件,我们在StackOverflow上找到一个类似的问题

  5. ruby-on-rails - Rails - 一个 View 中的多个模型 - 2

    我需要从一个View访问多个模型。以前,我的links_controller仅用于提供以不同方式排序的链接资源。现在我想包括一个部分(我假设)显示按分数排序的顶级用户(@users=User.all.sort_by(&:score))我知道我可以将此代码插入每个链接操作并从View访问它,但这似乎不是“ruby方式”,我将需要在不久的将来访问更多模型。这可能会变得很脏,是否有针对这种情况的任何技术?注意事项:我认为我的应用程序正朝着单一格式和动态页面内容的方向发展,本质上是一个典型的网络应用程序。我知道before_filter但考虑到我希望应用程序进入的方向,这似乎很麻烦。最终从任何

  6. ruby-on-rails - Rails 3.2.1 中 ActionMailer 中的未定义方法 'default_content_type=' - 2

    我在我的项目中添加了一个系统来重置用户密码并通过电子邮件将密码发送给他,以防他忘记密码。昨天它运行良好(当我实现它时)。当我今天尝试启动服务器时,出现以下错误。=>BootingWEBrick=>Rails3.2.1applicationstartingindevelopmentonhttp://0.0.0.0:3000=>Callwith-dtodetach=>Ctrl-CtoshutdownserverExiting/Users/vinayshenoy/.rvm/gems/ruby-1.9.3-p0/gems/actionmailer-3.2.1/lib/action_mailer

  7. ruby-on-rails - Rails 应用程序中的 Rails : How are you using application_controller. rb 是新手吗? - 2

    刚入门rails,开始慢慢理解。有人可以解释或给我一些关于在application_controller中编码的好处或时间和原因的想法吗?有哪些用例。您如何为Rails应用程序使用应用程序Controller?我不想在那里放太多代码,因为据我了解,每个请求都会调用此Controller。这是真的? 最佳答案 ApplicationController实际上是您应用程序中的每个其他Controller都将从中继承的类(尽管这不是强制性的)。我同意不要用太多代码弄乱它并保持干净整洁的态度,尽管在某些情况下ApplicationContr

  8. ruby-on-rails - form_for 中不在模型中的自定义字段 - 2

    我想向我的Controller传递一个参数,它是一个简单的复选框,但我不知道如何在模型的form_for中引入它,这是我的观点:{:id=>'go_finance'}do|f|%>Transferirde:para:Entrada:"input",:placeholder=>"Quantofoiganho?"%>Saída:"output",:placeholder=>"Quantofoigasto?"%>Nota:我想做一个额外的复选框,但我该怎么做,模型中没有一个对象,而是一个要检查的对象,以便在Controller中创建一个ifelse,如果没有检查,请帮助我,非常感谢,谢谢

  9. ruby - rspec 需要 .rspec 文件中的 spec_helper - 2

    我注意到像bundler这样的项目在每个specfile中执行requirespec_helper我还注意到rspec使用选项--require,它允许您在引导rspec时要求一个文件。您还可以将其添加到.rspec文件中,因此只要您运行不带参数的rspec就会添加它。使用上述方法有什么缺点可以解释为什么像bundler这样的项目选择在每个规范文件中都需要spec_helper吗? 最佳答案 我不在Bundler上工作,所以我不能直接谈论他们的做法。并非所有项目都checkin.rspec文件。原因是这个文件,通常按照当前的惯例,只

  10. ruby-on-rails - active_admin 目录中的常量警告重新声明 - 2

    我正在使用active_admin,我在Rails3应用程序的应用程序中有一个目录管理,其中包含模型和页面的声明。时不时地我也有一个类,当那个类有一个常量时,就像这样:classFooBAR="bar"end然后,我在每个必须在我的Rails应用程序中重新加载一些代码的请求中收到此警告:/Users/pupeno/helloworld/app/admin/billing.rb:12:warning:alreadyinitializedconstantBAR知道发生了什么以及如何避免这些警告吗? 最佳答案 在纯Ruby中:classA

随机推荐