草庐IT

pseudo-streaming

全部标签

java - Kafka Streams - 处理超时

我正在尝试使用.process()用TimeWindows.of("name",30000)批处理一些KTable值并发送它们。似乎30秒超过了消费者超时间隔,在此之后Kafka认为该消费者已失效并释放分区。我已经尝试提高轮询和提交间隔的频率来避免这种情况:config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,"5000");config.put(StreamsConfig.POLL_MS_CONFIG,"5000");不幸的是,这些错误仍在发生:(很多)ERRORo.a.k.s.p.internals.RecordCollector-E

Java Stream sum()短路

这个问题在这里已经有了答案:Howtoshort-circuitareduce()operationonaStream?(4个答案)关闭4年前。在做项目的时候写了这行,基本上是根据有多少子节点来决定是否合并当前节点。intsuccNodes=Arrays.stream(children).mapToInt(PRQuadNode::count).sum();if(succNodes问题是succNodes通常会比bucketingParam大很多。如果我已经找到足够大的数目,就没有必要继续数下去了。如果我知道我将无法通过检查succNodes注意:在这种情况下,子项始终为4号。注2:PR

java - 使用 Java 8 Streams 映射、聚合和组合总计

我正在尝试重新创建一个过程来创建一个对象列表,这些对象列表是使用Java8Streams的另一个对象列表的聚合。例如,我有一个类,如下所述,它是通过数据库调用或类似方式提供的publicclassOrder{privateStringorderNumber;privateStringcustomerNumber;privateStringcustomerGroup;privateDatedeliveryDate;privatedoubleorderValue;privatedoubleorderQty;}在我的应用程序的其他地方,我有一个OrderTotal类,它表示按客户编号和组对订

java - Streaming 的多项操作能否打破 Demeter 法则?

我有点想用Java8流编写Selenium页面对象,如下面的代码所述,并收到评论说我的代码违反了Demeter法则,因为我在一行中执行了很多操作。我被建议将代码分解为第一个流以收集列表并运行另一个流操作来进行匹配(简而言之,根据需要将其分解为多个流)。我不相信,因为引入Stream是为了处理数据处理,如果我们将它分解成多个流,那么使用流就没有意义了。之前我曾在一个网络安全项目中工作,其中数百万条记录通过流式处理和多个逻辑操作对数据进行排序。请分享您的想法,我已按照审阅者的建议对其进行了更改,但他无法解释原因,我想了解有关流的更多信息以及利用Java8的这一强大新增功能的正确方法。示例代

java - Spliterator 与 Stream.Builder

我读了一些关于如何创建有限的Stream的问题(FinitegeneratedStreaminJava-howtocreateone?,Howdostreamsstop?).建议实现Spliterator的答案。Spliterator将实现如何提供以及下一个提供哪个元素的逻辑(tryAdvance)。但是还有另外两个非默认方法trySplit和estimateSize()我必须实现。Spliterator的JavaDoc说:Anobjectfortraversingandpartitioningelementsofasource.Thesourceofelementscoveredby

java - 如何在 Java 8 Stream API 中使用 Collectors.grouping 来创建 map

我第一次寻找Java8的StreamAPI。我尝试创建一个过滤器来从Map中删除元素。这是我的map:Mapm=newHashMap();我想删除值)。这是我一直在尝试的:m.entrySet().stream().filter(p->p.getValue()>0).collect(Collectors.groupingBy(s->s.getKey()));我得到一个HashMap>。所以,这不是我想要的。我也试过:m.entrySet().stream().filter(p->p.getValue()>0).collect(Collectors.groupingBy(Map::Ent

java - 如何使用 Stream API 随机播放流?

我决定采用函数式方法生成字符串或随机字符,到目前为止我想到了这个,它应该比装箱然后使用StringJoiner作为收集器执行得更好:Randomrandom=newRandom();StringrandomString=IntStream.concat(random.ints(8,'a','z'),random.ints(8,'A','Z')).collect(StringBuilder::new,(sb,i)->sb.append((char)i),(sb1,sb2)->sb1.append(sb2)).toString();我想生成16个字符的流,范围从a-z或A-Z,我遇到的问题

具有 3 个参数的 Java8 stream.reduce() - 获得透明度

我编写这段代码是为了将单词列表缩减为一个很长的计数,即有多少单词以“A”开头。我编写它只是为了学习Java8,所以我想更好地理解它[免责声明:我意识到这可能不是编写此代码的最佳方式;这只是为了练习!]。LongcountOfAWords=results.stream().reduce(0L,(a,b)->b.charAt(0)=='A'?a+1:a,Long::sum);中间参数/lambda(称为累加器)似乎能够在没有最终“Combiner”参数的情况下减少完整列表。事实上,Javadoc实际上说:The{@codeaccumulator}functionactsasafusedma

java - Stream.sorted() 然后收集,还是收集然后 List.sort()?

这个问题在这里已经有了答案:Whatismoreefficient:sortedstreamorsortingalist?(3个答案)关闭4年前。总的来说,这两段代码在性能上有区别吗?Listlist1=someStream1.sorted().collect(toList());//vs.Listlist2=someStream2.collect(toList());list2.sort(Comparator.naturalOrder())变体2显然令人讨厌,应该避免,但我很好奇Stream的主流(嘿,mainstream)实现是否内置了任何性能优化,从而提高了性能两者的区别。我想因

java - 输入 stream.read 返回 0 或 -1?

有什么区别byte[]buffer=newbyte[1024];//this:if(inputStream.read(buffer)>0){/*...*/}//and:if(inputStream.read(buffer)!=-1){/*...*/}都可以判断网络流终止吗? 最佳答案 InputStream.read()的Javadocs说:Ifthelengthofbiszero,thennobytesarereadand0isreturned在正常使用中,这种情况永远不会发生,因此明确测试这种情况没有多大意义。(如果您想避免永远