我们正在从基于python的环境转移到scalaplay框架之一,我必须编写一个缓冲的发布者订阅者,它将与另一个用python编写的系统交互。这里的目标是编写一个发布者,它会从缓冲区/队列中获取消息,并通过TCP套接字将它们发送到服务器,并保持套接字打开,以便我们以后可以重用它。我们的旧python发布者创建了一个永远不会关闭的socket.socket(socket.AF_INET,socket.SOCK_STREAM)套接字,并使用调度程序从队列中取出消息并通过流式套接字发送它们。我对使用akkastreamingio很感兴趣,但我不知道如何创建一个与调度程序一起工作的流套接字,定
我有以下流程:valactorSource=Source.actorRef(10000,OverflowStrategy.dropHead)valtargetSink=Flow[ByteString].map(_.utf8String).via(newJsonStage()).map{json=>MqttMessages.jsonToObject(json)}.to(Sink.actorRef(self,"Done"))sourceRef=Some(Flow[ByteString].via(conn.flow).to(targetSink).runWith(actorSource))在
我有一个简单的客户端服务器程序服务器监听如下valmanager=IO(Tcp)manager!Bind(self,myAddress,1,options)然后在接收循环overridedefreceive={caseb@Bound(addr)=>{log.info("bound")myAddress=addrbBound=true}casec@Connected(remoteAddress,localAddress)=>log.info("ClientConnected.Remote:{}Local:{}",remoteAddress,localAddress)myAPAddress
我正在尝试使用akka-streams的Tcp客户端向数据库发送查询,但我不明白我错过了什么。所以我有两种类型Query和Response可以完美地转换为akka的ByteString或从ByteString转换。因此,我正在使用valconn=Tcp().outgoingConnection("localhost",28015)创建客户端连接,这为我提供了一个Flow[ByteString,ByteString,Future[OutgoingConnection]],到目前为止一切顺利。所以我假设源是我对查询的请求,我找不到用查询源提供此流程的最佳方法,而是像Source(Futur
在下面的代码中GSMmockunbound永远不会记录,即使“禁用”消息已发送到服务器。如何正确解除绑定(bind)akkatcp服务器?classGsmRouterextendsActor{importTcp._importcontext.systemvalname=this.getClass().getName()vallogger=LoggerFactory.getLogger(name)defreceive={case"enable"=>IO(Tcp)!Bind(self,ConfigurationUtils.gsmRouterAddress)case"disable"=>IO
我正在尝试在我的PlayFramework应用程序中共享我使用SORM框架创建的内存中H2数据库。下面是我的数据库代码objectDBextendsInstance(entities=Seq(Entity[Person]()),url="jdbc:h2:mem:db1"){}H2数据库站点上所述的解决方案是启动TCP服务器。在Java应用程序中,我可以使用以下代码共享数据库org.h2.tools.Serverserver=org.h2.tools.Server.createTcpServer();server.start();Connectionconn=DriverManager.
我正在尝试使用Akka流构建一个简单的tcp服务器。Tcp().bind(props.host,props.port).to(Sink.foreach(_.handleWith(handler))).run().onComplete{caseSuccess(i)=>logger.info(s"Serverisboundat${props.host}:${props.port}")caseFailure(e)=>logger.error("Serverbindingfailure",e)}我想一次允许最多一个连接。为此,我将以下行添加到我的application.conf文件中。akka
在Akka中有什么方法可以像在Erlang中那样使用{packet,4}实现数据包框架?数据包看起来像这样:4byteslengthinbigendian|body...例如:00000005HELLO0005WORLD将是两个数据包“HELLO”和“WORLD”,但它们被作为一个数据包接收。或者00000005HELL现在Akka接收到这8个字节,但还缺少一个字节,它将在下一次调用“接收”时接收问题是我的Actor的接收总是被部分或全部请求调用,但我只想在接收中获得“body”部分,并且只有在它被完全接收时。因此,它所需要的只是首先读取这4个字节,然后等待读取其他N个字节(N=4字节
做数据处理的程序员一定碰到过一个很有意思的问题,Flink到底用什么语言开发?Scala还是Java?国内部分程序员对scala开发flink好像存在着偏见或者是迷茫,一般是因为你能找到的flink项目大多是java写的。想要弄明白这个问题,首先要知道这个问题为什么会发生,作者在网上查看了相关的词条,并且根据开发经验,大致总结了一下对这个事情的个人看法。首先这个问题牵扯了一部分spark,2009年的时候spark作为第一个弥补MR无法基于内存计算缺陷的第二代大数据计算框架诞生于伯克利大学。这里的第一个是值基础架构相对完善,没有很严重的缺陷,单纯的谈论有无来说它前面还有个storm。但是sto
使用Scala2.10和Akka2.3.4,我组装了一个简单的代理服务器,它接受传入的TCP连接,然后将这些消息代理到远程服务器。一切都在使用纯文本,但我坚持使用SSL。简而言之,这就是我为传入连接启动非安全服务器的方式:valserver=system.actorOf(Props(newLegacyTCPServer),name="my-tcp-server")implicitvalbindingTimeout=Timeout(1.second)importsystem.dispatcher//executioncontextforthefuturevalboundFuture=IO