作者:禅与计算机程序设计艺术1.简介数据流是一个连续不断的、产生、存储和处理数据的过程。传统上,数据流编程都是基于特定平台(比如:消息队列,数据仓库,事件溯源)的SDK或者API进行开发,但随着云计算和容器技术的发展,越来越多的企业选择使用开源工具实现自己的大数据处理系统。其中ApacheFlink和ApacheKafka这两个开源项目提供了丰富的数据处理能力。本文将从Flink和Kafka的基本用法出发,通过一个案例来介绍如何利用这两个框架构建一个实时的数据流管道。阅读本文后,读者应该能够理解并掌握以下知识点:Flink与Kafka的特点及区别数据流编程模型:时间复杂度分析和异步计算用Fli
我们正在从基于python的环境转移到scalaplay框架之一,我必须编写一个缓冲的发布者订阅者,它将与另一个用python编写的系统交互。这里的目标是编写一个发布者,它会从缓冲区/队列中获取消息,并通过TCP套接字将它们发送到服务器,并保持套接字打开,以便我们以后可以重用它。我们的旧python发布者创建了一个永远不会关闭的socket.socket(socket.AF_INET,socket.SOCK_STREAM)套接字,并使用调度程序从队列中取出消息并通过流式套接字发送它们。我对使用akkastreamingio很感兴趣,但我不知道如何创建一个与调度程序一起工作的流套接字,定
我有以下流程:valactorSource=Source.actorRef(10000,OverflowStrategy.dropHead)valtargetSink=Flow[ByteString].map(_.utf8String).via(newJsonStage()).map{json=>MqttMessages.jsonToObject(json)}.to(Sink.actorRef(self,"Done"))sourceRef=Some(Flow[ByteString].via(conn.flow).to(targetSink).runWith(actorSource))在
我有一个简单的客户端服务器程序服务器监听如下valmanager=IO(Tcp)manager!Bind(self,myAddress,1,options)然后在接收循环overridedefreceive={caseb@Bound(addr)=>{log.info("bound")myAddress=addrbBound=true}casec@Connected(remoteAddress,localAddress)=>log.info("ClientConnected.Remote:{}Local:{}",remoteAddress,localAddress)myAPAddress
我有一个actor在preStart上绑定(bind)一个端口,然后期待Tcp.Bound消息。然后,它将等待Tcp.Connected发生。这个Actor没有向其创建者提供任何东西,所以我想接收Tcp消息和/或模拟Tcp管理器到目前为止,我尝试将我的TestKit探测器订阅到tcp消息。除此之外,我希望创建一个可以覆盖管理器的类,但仍然不知道该怎么做。我正在使用Java8和JUnit5。@OverridepublicvoidpreStart(){this.connection=Tcp.get(getContext().getSystem()).manager();this.conne
我与服务器端建立了HTTP连接,使用System.IO.Stream.Read读取HTTP请求正文消息。问题是每隔几分钟服务器就会卡在Read语句上,并且在达到套接字超时或客户端关闭连接之前不会继续。intbytesRead=0;while(bytesRead如果流没有contentLength变量指定的数据量,就会发生这种情况。事实并非如此,因为当使用WireShark跟踪tcp流时,我看到整个消息正文(由contentLength指定)已到达服务器计算机。它仅在第一次“使用”while循环时发生,即仅在第一次流中没有“contentLength”字节数可在一次尝试和while中读取
我对AkkaTCPIO有奇怪的行为,问题是连接重置,由于从处理程序或Terminating处理程序显式调用TcpMessage.abort()。对等端未收到Tcp.ConnectionClosed事件。示例:接收处理程序@OverridepublicvoidonReceive(Objectmsg)throwsException{if(msginstanceofTcp.ConnectionClosed){log.info("ServerConnectionClosed:{}",msg);getContext().stop(getSelf());}elseif(msginstanceofT
我正在尝试使用akka-streams的Tcp客户端向数据库发送查询,但我不明白我错过了什么。所以我有两种类型Query和Response可以完美地转换为akka的ByteString或从ByteString转换。因此,我正在使用valconn=Tcp().outgoingConnection("localhost",28015)创建客户端连接,这为我提供了一个Flow[ByteString,ByteString,Future[OutgoingConnection]],到目前为止一切顺利。所以我假设源是我对查询的请求,我找不到用查询源提供此流程的最佳方法,而是像Source(Futur
我有2个系统:系统1正在运行akka和HAProxy,系统2正在运行向akka发出请求的REST组件。Akka在系统1的端口4241上运行。当没有HAProxy时,系统2能够连接到系统1。我在系统1上安装HAProxy后,从系统2到系统1的请求出错,日志如下:ERROR[deal-akka.actor.default-dispatcher-18]EndpointWriter-droppingmessage[classakka.actor.ActorSelectionMessage]fornon-localrecipient[Actor[akka.tcp://akkaSystemName
在下面的代码中GSMmockunbound永远不会记录,即使“禁用”消息已发送到服务器。如何正确解除绑定(bind)akkatcp服务器?classGsmRouterextendsActor{importTcp._importcontext.systemvalname=this.getClass().getName()vallogger=LoggerFactory.getLogger(name)defreceive={case"enable"=>IO(Tcp)!Bind(self,ConfigurationUtils.gsmRouterAddress)case"disable"=>IO