草庐IT

akka-stream

全部标签

java - 知道akka actor存在的三种方法

我正在研究akkaactors(JAVA),最近了解到有3种方法(可能更多)可以了解actor的存在。发送身份信息:ActorSelectionsel=actorSystem.actorSelection("akka://test/user/TestActor");AskableActorSelectionasker=newAskableActorSelection(sel);Futurefuture=asker.ask(newIdentify(1),newTimeout(5,TimeUnit.SECONDS));ActorIdentityidentity=(ActorIdentity

java - 合并多个相同的 Kafka Streams 主题

我有2个Kafka主题流式传输来自不同来源的完全相同的内容,因此我可以在其中一个来源出现故障时保持高可用性。我正在尝试使用KafkaStreams0.10.1.0将2个主题合并为1个输出主题,这样我就不会错过任何有关失败的消息,并且在所有源都启动时不会出现重复。当使用KStream的leftJoin方法时,其中一个主题可以正常下降(次要主题),但是当主要主题下降时,不会向输出主题发送任何内容。这似乎是因为,根据KafkaStreamsdeveloperguide,KStream-KStreamleftJoinisalwaysdrivenbyrecordsarrivingfromthep

java - Java Stream API 是如何选择执行计划的?

我刚开始学习Java8中的StreamAPI和一般的函数式编程,但对Java并不陌生。我有兴趣了解和了解StreamAPI如何选择执行计划。它如何知道哪些部分需要并行化,哪些部分不需要?存在多少种执行计划?基本上,我想知道为什么Java8中的Streams有助于使事情变得更快,以及它如何发挥这种“魔力”。我找不到太多关于这一切如何运作的文献。 最佳答案 这个问题有点宽泛,不好详细解释,但我会尽力回答到满意的程度。我还使用了ArrayList的Stream示例。当我们创建流时,返回的对象称为ReferencePipeline.这个对象

java - Stream collect with Generic 类型

我尝试在将json反序列化为pojo的方法中使用泛型,以便它可以返回任何对象类型。这是我的代码:privateBla(Listas,Listbs){this.as=as;this.bs=bs;}publicstaticBlafrom(JsonObjectjson){returnnewBla(Bla.load(json,As),Bla.load(json,Bs));}privatestaticListload(JsonObjectjsonObject,Stringparam){returnjsonObject.getJsonArray(param).stream().map(Bla::g

java - 如何用 Streams 替换 Iterables.filter()?

我正在尝试从Guava迁移到Java8Streams,但不知道如何处理可迭代对象。这是我的代码,用于从可迭代对象中删除空字符串:Iterablelist=Iterables.filter(raw,//it'sIterablenewPredicate(){@Overridepublicbooleanapply(Stringtext){return!text.isEmpty();}});注意,这是一个Iterable,不是Collection.它可能包含无限数量的项目,我无法将它们全部加载到内存中。我的Java8替代品是什么?顺便说一句,有了Lamba,这段代码看起来会更短:Iterabl

java - 为什么Stream <T> collect方法返回不同的键顺序?

我有以下代码:publicenumContinent{ASIA,EUROPE}publicclassCountry{privateStringname;privateContinentregion;publicCountry(Stringna,Continentreg){this.name=na;this.region=reg;}publicStringgetName(){returnname;}publicContinentgetRegion(){returnregion;}@OverridepublicStringtoString(){return"Country[name="+n

java - Akka.io,在类 Actor 上找不到匹配的构造函数

我正在尝试设置Akkaactor来处理游戏中的网络套接字。我已经定义了一个简单的actor来通过网络套接字发送消息:packageactors;importakka.actor.*;publicclassMyWebSocketActorextendsUntypedActor{publicstaticPropsprops(ActorRefout){returnProps.create(MyWebSocketActor.class,out);}privatefinalActorRefout;publicMyWebSocketActor(ActorRefout){this.out=out;}

java - JSch 0.1.53 session.connect() 抛出 "End of IO Stream Read"

我下载了一个新的JSch0.1.53libraryJSch(sftp)下载任务不再有效。此版本在session.connect()函数上失败并抛出错误Session.connect:java.io.IOException:EndofIOStreamRead。我的旧jsch.jar(2011-10-06)在同一台主机上工作正常,也许我缺少新的配置Prop?Sessionsession=null;ChannelSftpchannel=null;try{JSch.setLogger(SSHUtil.createJschLogger());JSchjsch=newJSch();session=

java - 为什么 Java Stream 生成器是无序的?

我尝试使用JavaStreams并行化一些工作。让我们考虑这个简单的例子:Stream.generate(newSupplier(){@OverridepublicIntegerget(){returngenerateNewInteger();}}).parallel().forEachOrdered(newConsumer(){@Overridepublicvoidaccept(Integerinteger){System.out.println(integer);}});问题是它不会为forEachOrdered调用accept方法,它只有在我使用forEach时才有效。我想问题是

java - Akka 远程 actor 服务器发现

我想在集群上部署一个用akka制作的远程actor软件。该系统由多个工作节点和一个主节点组成。问题是我无法提前知道集群节点的IP地址(但我知道它们都属于同一子网)。因此,我需要一种在启动后发现每个人的IP地址的好方法,以便在每个节点上创建正确的actor引用。我正在寻找在任何自由软件许可下分发的轻量级解决方案(我只需要它进行初始设置)。 最佳答案 前一段时间我创建了一个prototype旨在解决您的问题(请随意重用代码和/或做出贡献)。简单介绍一下它是如何工作的。它为每个参与者注册表(=节点)启动一个远程参与者。RegistryAc