草庐IT

java - Spring 集成 : TCP Client/Server opening one client connection per server connection

coder 2023-09-19 原文

我正在尝试使用 Spring Integration 实现一个 TCP 客户端/服务器应用程序,我需要为每个传入的 TCP 服务器连接打开一个 TCP 客户端套接字。

基本上,我有一堆物联网设备通过原始 TCP 套接字与后端服务器通信。我需要在系统中实现额外的功能。但是设备和服务器上的软件都是封闭源代码,所以我对此无能为力。所以我的想法是在设备和服务器之间放置中间件,以拦截此客户端/服务器通信并提供附加功能。

我正在使用带有入站/出站 channel 适配器的 TcpNioServerConnectionFactoryTcpNioClientConnectionFactory 向各方发送消息/从各方接收消息。但是消息结构中没有将消息绑定(bind)到某个设备的信息;因此,每次服务器套接字上出现来自新设备的新连接时,我都必须打开一个新的客户端套接字到后端。此客户端连接必须绑定(bind)到该特定服务器套接字的生命周期。它绝不能被重用,如果这个客户端套接字(后端到中间件)由于任何原因而死,服务器套接字(中间件到设备)也必须关闭。我该怎么做?

编辑:我的第一个想法是将 AbstractClientConnectionFactory 子类化,但它似乎除了在被询问时提供客户端连接外什么都不做。我应该研究入站/出站 channel 适配器的子类化还是其他地方?我还应该提到,我也对非 Spring 集成解决方案持开放态度,例如 Apache Camel,甚至是带有原始 NIO 套接字的自定义解决方案。

编辑 2:我通过切换到 TcpNetServerConnectionFactory 并使用 ThreadAffinityClientConnectionFactory 包装客户端工厂而完成了一半,并且设备可以正常到达后端。但是,当后端发回某些内容时,我收到错误 Unable to find outbound socket for GenericMessage 并且客户端套接字终止。我认为这是因为后端没有正确路由消息所需的 header 。我怎样才能捕捉到这些信息?我的配置类如下:

@Configuration
@EnableIntegration
@IntegrationComponentScan
public class ServerConfiguration {

    @Bean
    public AbstractServerConnectionFactory serverFactory() {
        AbstractServerConnectionFactory factory = new TcpNetServerConnectionFactory(8000);
        factory.setSerializer(new MapJsonSerializer());
        factory.setDeserializer(new MapJsonSerializer());
        return factory;
    }

    @Bean
    public AbstractClientConnectionFactory clientFactory() {
        AbstractClientConnectionFactory factory = new TcpNioClientConnectionFactory("localhost", 3333);
        factory.setSerializer(new MapJsonSerializer());
        factory.setDeserializer(new MapJsonSerializer());
        factory.setSingleUse(true);
        return new ThreadAffinityClientConnectionFactory(factory);
    }

    @Bean
    public TcpReceivingChannelAdapter inboundDeviceAdapter(AbstractServerConnectionFactory connectionFactory) {
        TcpReceivingChannelAdapter inbound = new TcpReceivingChannelAdapter();
        inbound.setConnectionFactory(connectionFactory);
        return inbound;
    }

    @Bean
    public TcpSendingMessageHandler outboundDeviceAdapter(AbstractServerConnectionFactory connectionFactory) {
        TcpSendingMessageHandler outbound = new TcpSendingMessageHandler();
        outbound.setConnectionFactory(connectionFactory);
        return outbound;
    }

    @Bean
    public TcpReceivingChannelAdapter inboundBackendAdapter(AbstractClientConnectionFactory connectionFactory) {
        TcpReceivingChannelAdapter inbound = new TcpReceivingChannelAdapter();
        inbound.setConnectionFactory(connectionFactory);
        return inbound;
    }

    @Bean
    public TcpSendingMessageHandler outboundBackendAdapter(AbstractClientConnectionFactory connectionFactory) {
        TcpSendingMessageHandler outbound = new TcpSendingMessageHandler();
        outbound.setConnectionFactory(connectionFactory);
        return outbound;
    }

    @Bean
    public IntegrationFlow backendIntegrationFlow() {
        return IntegrationFlows.from(inboundBackendAdapter(clientFactory()))
                .log(LoggingHandler.Level.INFO)
                .handle(outboundDeviceAdapter(serverFactory()))
                .get();
    }

    @Bean
    public IntegrationFlow deviceIntegrationFlow() {
        return IntegrationFlows.from(inboundDeviceAdapter(serverFactory()))
                .log(LoggingHandler.Level.INFO)
                .handle(outboundBackendAdapter(clientFactory()))
                .get();
    }
}

最佳答案

目前还不完全清楚你在问什么,所以我假设你的意思是你想要在你的客户端和服务器之间有一个 spring 集成代理。像这样的东西:

iot-device -> spring server -> message-transformation -> spring client -> back-end-server

如果是这种情况,您可以实现包装标准工厂的 ClientConnectionIdAware 客户端连接工厂。

在集成流程中,将消息中传入的 ip_connectionId header 绑定(bind)到线程(在 ThreadLocal 中)。

然后,在客户端连接工厂中,使用ThreadLocal值在一个Map中查找对应的传出连接;如果未找到(或关闭),则创建一个新的并将其存储在 map 中以供将来重复使用。

实现一个ApplictionListener(或@EventListener)来监听来自服务器连接工厂的TcpConnectionCloseEventclose() 相应的出站连接。

这听起来像是一个很酷的增强功能,因此请考虑将其贡献回框架。

编辑

5.0 版添加了 ThreadAffinityClientConnectionFactory,它可以与 TcpNetServerConnectionFactory 开箱即用,因为每个连接都有自己的线程。

使用 TcpNioServerConnectionFactory,您将需要额外的逻辑来为每个请求动态地将连接绑定(bind)到线程。

EDIT2

@SpringBootApplication
public class So51200675Application {

    public static void main(String[] args) {
        SpringApplication.run(So51200675Application.class, args).close();
    }

    @Bean
    public ApplicationRunner runner() {
        return args -> {
            Socket socket = SocketFactory.getDefault().createSocket("localhost", 1234);
            socket.getOutputStream().write("foo\r\n".getBytes());
            BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            System.out.println(reader.readLine());
            socket.close();
        };
    }

    @Bean
    public Map<String, String> fromToConnectionMappings() {
        return new ConcurrentHashMap<>();
    }

    @Bean
    public Map<String, String> toFromConnectionMappings() {
        return new ConcurrentHashMap<>();
    }

    @Bean
    public IntegrationFlow proxyInboundFlow() {
        return IntegrationFlows.from(Tcp.inboundAdapter(serverFactory()))
                .transform(Transformers.objectToString())
                .<String, String>transform(s -> s.toUpperCase())
                .handle((p, h) -> {
                    mapConnectionIds(h);
                    return p;
                })
                .handle(Tcp.outboundAdapter(threadConnectionFactory()))
                .get();
    }

    @Bean
    public IntegrationFlow proxyOutboundFlow() {
        return IntegrationFlows.from(Tcp.inboundAdapter(threadConnectionFactory()))
                .transform(Transformers.objectToString())
                .<String, String>transform(s -> s.toUpperCase())
                .enrichHeaders(e -> e
                        .headerExpression(IpHeaders.CONNECTION_ID, "@toFromConnectionMappings.get(headers['"
                                + IpHeaders.CONNECTION_ID + "'])").defaultOverwrite(true))
                .handle(Tcp.outboundAdapter(serverFactory()))
                .get();
    }

    private void mapConnectionIds(Map<String, Object> h) {
        try {
            TcpConnection connection = threadConnectionFactory().getConnection();
            String mapping = toFromConnectionMappings().get(connection.getConnectionId());
            String incomingCID = (String) h.get(IpHeaders.CONNECTION_ID);
            if (mapping == null || !(mapping.equals(incomingCID))) {
                System.out.println("Adding new mapping " + incomingCID + " to " + connection.getConnectionId());
                toFromConnectionMappings().put(connection.getConnectionId(), incomingCID);
                fromToConnectionMappings().put(incomingCID, connection.getConnectionId());
            }
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Bean
    public ThreadAffinityClientConnectionFactory threadConnectionFactory() {
        return new ThreadAffinityClientConnectionFactory(clientFactory()) {

            @Override
            public boolean isSingleUse() {
                return false;
            }

        };
    }

    @Bean
    public AbstractServerConnectionFactory serverFactory() {
        return Tcp.netServer(1234).get();
    }

    @Bean
    public AbstractClientConnectionFactory clientFactory() {
        AbstractClientConnectionFactory clientFactory = Tcp.netClient("localhost", 1235).get();
        clientFactory.setSingleUse(true);
        return clientFactory;
    }

    @Bean
    public IntegrationFlow serverFlow() {
        return IntegrationFlows.from(Tcp.inboundGateway(Tcp.netServer(1235)))
                .transform(Transformers.objectToString())
                .<String, String>transform(p -> p + p)
                .get();
    }

    @Bean
    public ApplicationListener<TcpConnectionCloseEvent> closer() {
        return e -> {
            if (fromToConnectionMappings().containsKey(e.getConnectionId())) {
                String key = fromToConnectionMappings().remove(e.getConnectionId());
                toFromConnectionMappings().remove(key);
                System.out.println("Removed mapping " + e.getConnectionId() + " to " + key);
                threadConnectionFactory().releaseConnection();
            }
        };
    }

}

EDIT3

使用 MapJsonSerializer 对我来说效果很好。

@SpringBootApplication
public class So51200675Application {

    public static void main(String[] args) {
        SpringApplication.run(So51200675Application.class, args).close();
    }

    @Bean
    public ApplicationRunner runner() {
        return args -> {
            Socket socket = SocketFactory.getDefault().createSocket("localhost", 1234);
            socket.getOutputStream().write("{\"foo\":\"bar\"}\n".getBytes());
            BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            System.out.println(reader.readLine());
            socket.close();
        };
    }

    @Bean
    public Map<String, String> fromToConnectionMappings() {
        return new ConcurrentHashMap<>();
    }

    @Bean
    public Map<String, String> toFromConnectionMappings() {
        return new ConcurrentHashMap<>();
    }

    @Bean
    public MapJsonSerializer serializer() {
        return new MapJsonSerializer();
    }

    @Bean
    public IntegrationFlow proxyRequestFlow() {
        return IntegrationFlows.from(Tcp.inboundAdapter(serverFactory()))
                .<Map<String, String>, Map<String, String>>transform(m -> {
                    m.put("foo", m.get("foo").toUpperCase());
                    return m;
                })
                .handle((p, h) -> {
                    mapConnectionIds(h);
                    return p;
                })
                .handle(Tcp.outboundAdapter(threadConnectionFactory()))
                .get();
    }

    @Bean
    public IntegrationFlow proxyReplyFlow() {
        return IntegrationFlows.from(Tcp.inboundAdapter(threadConnectionFactory()))
                .<Map<String, String>, Map<String, String>>transform(m -> {
                    m.put("foo", m.get("foo").toLowerCase() + m.get("foo"));
                    return m;
                })
                .enrichHeaders(e -> e
                        .headerExpression(IpHeaders.CONNECTION_ID, "@toFromConnectionMappings.get(headers['"
                                + IpHeaders.CONNECTION_ID + "'])").defaultOverwrite(true))
                .handle(Tcp.outboundAdapter(serverFactory()))
                .get();
    }

    private void mapConnectionIds(Map<String, Object> h) {
        try {
            TcpConnection connection = threadConnectionFactory().getConnection();
            String mapping = toFromConnectionMappings().get(connection.getConnectionId());
            String incomingCID = (String) h.get(IpHeaders.CONNECTION_ID);
            if (mapping == null || !(mapping.equals(incomingCID))) {
                System.out.println("Adding new mapping " + incomingCID + " to " + connection.getConnectionId());
                toFromConnectionMappings().put(connection.getConnectionId(), incomingCID);
                fromToConnectionMappings().put(incomingCID, connection.getConnectionId());
            }
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Bean
    public ThreadAffinityClientConnectionFactory threadConnectionFactory() {
        return new ThreadAffinityClientConnectionFactory(clientFactory()) {

            @Override
            public boolean isSingleUse() {
                return false;
            }

        };
    }

    @Bean
    public AbstractServerConnectionFactory serverFactory() {
        return Tcp.netServer(1234)
                .serializer(serializer())
                .deserializer(serializer())
                .get();
    }

    @Bean
    public AbstractClientConnectionFactory clientFactory() {
        AbstractClientConnectionFactory clientFactory = Tcp.netClient("localhost", 1235)
                .serializer(serializer())
                .deserializer(serializer())
                .get();
        clientFactory.setSingleUse(true);
        return clientFactory;
    }

    @Bean
    public IntegrationFlow backEndEmulatorFlow() {
        return IntegrationFlows.from(Tcp.inboundGateway(Tcp.netServer(1235)
                    .serializer(serializer())
                    .deserializer(serializer())))
                .<Map<String, String>, Map<String, String>>transform(m -> {
                    m.put("foo", m.get("foo") + m.get("foo"));
                    return m;
                })
                .get();
    }

    @Bean
    public ApplicationListener<TcpConnectionCloseEvent> closer() {
        return e -> {
            if (fromToConnectionMappings().containsKey(e.getConnectionId())) {
                String key = fromToConnectionMappings().remove(e.getConnectionId());
                toFromConnectionMappings().remove(key);
                System.out.println("Removed mapping " + e.getConnectionId() + " to " + key);
                threadConnectionFactory().releaseConnection();
            }
        };
    }

}

添加新映射 localhost:56998:1234:55c822a4-4252-45e6-9ef2-79263391f4be 到 localhost:1235:56999:3d520ca9-2f3a-44c3-b05f-e59695b8c1b0 {“foo”:“barbarBARBAR”} 删除映射 localhost:56998:1234:55c822a4-4252-45e6-9ef2-79263391f4be 到 localhost:1235:56999:3d520ca9-2f3a-44c3-b05f-e59695b8c1b0

关于java - Spring 集成 : TCP Client/Server opening one client connection per server connection,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51200675/

有关java - Spring 集成 : TCP Client/Server opening one client connection per server connection的更多相关文章

  1. java - 等价于 Java 中的 Ruby Hash - 2

    我真的很习惯使用Ruby编写以下代码:my_hash={}my_hash['test']=1Java中对应的数据结构是什么? 最佳答案 HashMapmap=newHashMap();map.put("test",1);我假设? 关于java-等价于Java中的RubyHash,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/22737685/

  2. ruby-on-rails - 带 Spring 锁的 Rails 4 控制台 - 2

    我正在使用Ruby2.1.1和Rails4.1.0.rc1。当执行railsc时,它被锁定了。使用Ctrl-C停止,我得到以下错误日志:~/.rvm/gems/ruby-2.1.1/gems/spring-1.1.2/lib/spring/client/run.rb:47:in`gets':Interruptfrom~/.rvm/gems/ruby-2.1.1/gems/spring-1.1.2/lib/spring/client/run.rb:47:in`verify_server_version'from~/.rvm/gems/ruby-2.1.1/gems/spring-1.1.

  3. ruby-on-rails - 如何使辅助方法在 Rails 集成测试中可用? - 2

    我在app/helpers/sessions_helper.rb中有一个帮助程序文件,其中包含一个方法my_preference,它返回当前登录用户的首选项。我想在集成测试中访问该方法。例如,这样我就可以在测试中使用getuser_path(my_preference)。在其他帖子中,我读到这可以通过在测试文件中包含requiresessions_helper来实现,但我仍然收到错误NameError:undefinedlocalvariableormethod'my_preference'.我做错了什么?require'test_helper'require'sessions_hel

  4. ruby-on-rails - 我如何将 Hoptoad 与 DelayedJob 和 DaemonSpawn 集成? - 2

    我一直很高兴地使用DelayedJob习惯用法:foo.send_later(:bar)这会调用DelayedJob进程中对象foo的方法bar。我一直在使用DaemonSpawn在我的服务器上启动DelayedJob进程。但是...如果foo抛出异常,Hoptoad不会捕获它。这是任何这些包中的错误...还是我需要更改某些配置...或者我是否需要在DS或DJ中插入一些异常处理来调用Hoptoad通知程序?回应下面的第一条评论。classDelayedJobWorker 最佳答案 尝试monkeypatchingDelayed::W

  5. java - 从 JRuby 调用 Java 类的问题 - 2

    我正在尝试使用boilerpipe来自JRuby。我看过guide从JRuby调用Java,并成功地将它与另一个Java包一起使用,但无法弄清楚为什么同样的东西不能用于boilerpipe。我正在尝试基本上从JRuby中执行与此Java等效的操作:URLurl=newURL("http://www.example.com/some-location/index.html");Stringtext=ArticleExtractor.INSTANCE.getText(url);在JRuby中试过这个:require'java'url=java.net.URL.new("http://www

  6. java - 我的模型类或其他类中应该有逻辑吗 - 2

    我只想对我一直在思考的这个问题有其他意见,例如我有classuser_controller和classuserclassUserattr_accessor:name,:usernameendclassUserController//dosomethingaboutanythingaboutusersend问题是我的User类中是否应该有逻辑user=User.newuser.do_something(user1)oritshouldbeuser_controller=UserController.newuser_controller.do_something(user1,user2)我

  7. java - 什么相当于 ruby​​ 的 rack 或 python 的 Java wsgi? - 2

    什么是ruby​​的rack或python的Java的wsgi?还有一个路由库。 最佳答案 来自Python标准PEP333:Bycontrast,althoughJavahasjustasmanywebapplicationframeworksavailable,Java's"servlet"APImakesitpossibleforapplicationswrittenwithanyJavawebapplicationframeworktoruninanywebserverthatsupportstheservletAPI.ht

  8. Observability:从零开始创建 Java 微服务并监控它 (二) - 2

    这篇文章是继上一篇文章“Observability:从零开始创建Java微服务并监控它(一)”的续篇。在上一篇文章中,我们讲述了如何创建一个Javaweb应用,并使用Filebeat来收集应用所生成的日志。在今天的文章中,我来详述如何收集应用的指标,使用APM来监控应用并监督web服务的在线情况。源码可以在地址 https://github.com/liu-xiao-guo/java_observability 进行下载。摄入指标指标被视为可以随时更改的时间点值。当前请求的数量可以改变任何毫秒。你可能有1000个请求的峰值,然后一切都回到一个请求。这也意味着这些指标可能不准确,你还想提取最小/

  9. 【Java 面试合集】HashMap中为什么引入红黑树,而不是AVL树呢 - 2

    HashMap中为什么引入红黑树,而不是AVL树呢1.概述开始学习这个知识点之前我们需要知道,在JDK1.8以及之前,针对HashMap有什么不同。JDK1.7的时候,HashMap的底层实现是数组+链表JDK1.8的时候,HashMap的底层实现是数组+链表+红黑树我们要思考一个问题,为什么要从链表转为红黑树呢。首先先让我们了解下链表有什么不好???2.链表上述的截图其实就是链表的结构,我们来看下链表的增删改查的时间复杂度增:因为链表不是线性结构,所以每次添加的时候,只需要移动一个节点,所以可以理解为复杂度是N(1)删:算法时间复杂度跟增保持一致查:既然是非线性结构,所以查询某一个节点的时候

  10. jenkins部署1--jenkins+gitee持续集成 - 2

    前置步骤我们都操作完了,这篇开始介绍jenkins的集成。话不多说,看操作1、登录进入jenkins后会让你选择安装插件,选择第一个默认的就行。安装完成后设置账号密码,重新登录。2、配置JDK和Git都需要执行路径,所以需要先把执行路径找到,先进入服务器的docker容器,2.1JDK的路径root@69eef9ee86cf:/usr/bin#echo$JAVA_HOME/usr/local/openjdk-82.2Git的路径root@69eef9ee86cf:/#whichgit/usr/bin/git3、先配置JDK和Git。点击:ManageJenkins>>GlobalToolCon

随机推荐