草庐IT

java - Webflux websocketclient,如何在同一个 session 中发送多个请求[设计客户端库]

coder 2024-03-03 原文

TL; 博士;

我们正在尝试使用 spring webflux WebSocket 实现设计一个 WebSocket 服务器。服务器具有通常的 HTTP 服务器操作,例如create/fetch/update/fetchall .使用 WebSockets,我们试图公开一个端点,以便客户端可以利用单个连接进行所有类型的操作,因为 WebSockets 就是为此目的而设计的。 webflux 和 WebSockets 的设计是否正确?

长版

我们正在启动一个项目,该项目将使用来自 spring-webflux 的响应式(Reactive) Web 套接字。 .我们需要构建一个响应式(Reactive)客户端库,消费者可以使用它来连接到服务器。

在服务器上,我们收到一个请求,读取一条消息,保存它并返回一个静态响应:

public Mono<Void> handle(WebSocketSession webSocketSession) {
    Flux<WebSocketMessage> response = webSocketSession.receive()
            .map(WebSocketMessage::retain)
            .concatMap(webSocketMessage -> Mono.just(webSocketMessage)
                    .map(parseBinaryToEvent) //logic to get domain object
                    .flatMap(e -> service.save(e))
                    .thenReturn(webSocketSession.textMessage(SAVE_SUCCESSFUL))
            );

    return webSocketSession.send(response);
}

在客户端 , 我们想在有人来电时拨打save方法并返回来自 server 的响应.
public Mono<String> save(Event message) {
    new ReactorNettyWebSocketClient().execute(uri, session -> {
      session
              .send(Mono.just(session.binaryMessage(formatEventToMessage)))
              .then(session.receive()
                      .map(WebSocketMessage::getPayloadAsText)
                      .doOnNext(System.out::println).then()); //how to return this to client
    });
    return null;
}

我们不确定,如何设计这个。理想情况下,我们认为应该有

1) client.execute应该只调用一次并以某种方式保持 session .应该使用相同的 session 在后续调用中发送数据。

2) 如何从服务器返回我们在session.receive 中得到的响应?

3) 万一fetch怎么办?当 session.receive 中的响应很大时(不仅仅是静态字符串,而是事件列表) ?

我们正在做一些研究,但我们无法在线找到 webflux-websocket-client 文档/实现的合适资源。关于如何前进的任何指示。

最佳答案

请!使用 RSocket !

这是绝对正确的设计,值得为所有可能的操作节省资源并为每个客户端仅使用一个连接。

但是,不要实现轮子并使用为您提供所有这些类型通信的协议(protocol)。

  • RSocket 有一个请求-响应模型,它允许您进行当今最常见的客户端-服务器交互。
  • RSocket 具有请求流通信模型,因此您可以满足所有需求并异步重用相同的连接返回事件流。 RSocket 将所有逻辑流映射到物理连接并返回,因此您不会感到自己这样做的痛苦。
  • RSocket 有更多的交互模型,例如
    在以下情况下可能有用的即发即忘和流流
    以两种方式发送数据流。

  • 如何在 Spring 中使用 RSocket

    这样做的选项之一是使用 RSocket 协议(protocol)的 RSocket-Java 实现。 RSocket-Java 建立在 Project Reactor 之上,所以它自然适合 Spring WebFlux 生态系统。

    不幸的是,没有与 Spring 生态系统的特色集成。幸运的是,我花了几个小时提供了一个简单的 RSocket Spring Boot Starter将 Spring WebFlux 与 RSocket 集成在一起,并将 WebSocket RSocket 服务器与 WebFlux Http 服务器一起公开。

    为什么 RSocket 是更好的方法?

    基本上,RSocket 隐藏了自己实现相同方法的复杂性。使用 RSocket,我们不必关心作为自定义协议(protocol)和 Java 实现的交互模型定义。 RSocket 为我们将数据传送到特定的逻辑 channel 。它提供了一个内置客户端,可以将消息发送到同一个 WS 连接,因此我们不必为此发明自定义实现。

    使用 RSocket-RPC 让它变得更好

    由于 RSocket 只是一个协议(protocol),它不提供任何消息格式,所以这个挑战是针对业务逻辑的。然而,有一个 RSocket-RPC 项目提供了 Protocol Buffer 作为消息格式,并重用了与 GRPC 相同的代码生成技术。因此,使用 RSocket-RPC 我们可以轻松地为客户端和服务器构建 API,而无需关心传输和协议(protocol)抽象。

    同样的 RSocket Spring Boot 集成提供了一个 example RSocket-RPC 的使用也是如此。

    好吧,它没有说服我,我仍然想要一个自定义的 WebSocket 服务器

    所以,为了这个目的,你必须自己实现那个 hell 。我之前已经做过一次,但我不能指出那个项目,因为它是一个企业项目。
    不过,我可以分享一些代码示例,它们可以帮助您构建合适的客户端和服务器。

    服务器端

    处理程序和开放逻辑订阅者映射

    必须考虑的第一点是,一个物理连接中的所有逻辑流都应该存储在某个地方:
    class MyWebSocketRouter implements WebSocketHandler {
    
      final Map<String, EnumMap<ActionMessage.Type, ChannelHandler>> channelsMapping;
    
    
      @Override
      public Mono<Void> handle(WebSocketSession session) {
        final Map<String, Disposable> channelsIdsToDisposableMap = new HashMap<>();
        ...
      }
    }
    

    上面的示例中有两张 map 。第一个是你的路由映射,它允许你根据传入的消息参数来识别路由,等等。第二个是为请求流用例创建的(在我的例子中它是 Activity 订阅的映射),所以你可以发送一个消息帧来创建一个订阅,或者让你订阅一个特定的 Action 并保持这个订阅,所以一旦取消订阅执行操作,如果订阅存在,您将被取消订阅。

    使用 Processor 进行消息复用

    为了从所有逻辑流发回消息,您必须将消息多路复用到一个流。例如,使用 Reactor,您可以使用 UnicastProcessor 来做到这一点。 :
    @Override
    public Mono<Void> handle(WebSocketSession session) {
      final UnicastProcessor<ResponseMessage<?>> funIn = UnicastProcessor.create(Queues.<ResponseMessage<?>>unboundedMultiproducer().get());
      ...
    
      return Mono
        .subscriberContext()
        .flatMap(context -> Flux.merge(
          session
            .receive()
            ...
            .cast(ActionMessage.class)
            .publishOn(Schedulers.parallel())
            .doOnNext(am -> {
              switch (am.type) {
                case CREATE:
                case UPDATE:
                case CANCEL: {
                  ...
                }
                case SUBSCRIBE: {
                  Flux<ResponseMessage<?>> flux = Flux
                    .from(
                      channelsMapping.get(am.getChannelId())
                                     .get(ActionMessage.Type.SUBSCRIBE)
                                     .handle(am) // returns Publisher<>
                    );
    
                  if (flux != null) {
                    channelsIdsToDisposableMap.compute(
                      am.getChannelId() + am.getSymbol(), // you can generate a uniq uuid on the client side if needed
                      (cid, disposable) -> {
                        ...
    
                        return flux
                          .subscriberContext(context)
                          .subscribe(
                            funIn::onNext, // send message to a Processor manually
                            e -> {
                              funIn.onNext(
                                new ResponseMessage<>( // send errors as a messages to Processor here
                                  0,
                                  e.getMessage(),
                                  ...
                                  ResponseMessage.Type.ERROR
                                )
                              );
                            }
                          );
                      }
                    );
                  }
    
                  return;
                }
                case UNSABSCRIBE: {
                  Disposable disposable = channelsIdsToDisposableMap.get(am.getChannelId() + am.getSymbol());
    
                  if (disposable != null) {
                    disposable.dispose();
                  }
                }
              }
            })
            .then(Mono.empty()),
    
            funIn
                ...
                .map(p -> new WebSocketMessage(WebSocketMessage.Type.TEXT, p))
                .as(session::send)
          ).then()
        );
    }
    

    正如我们从上面的示例中看到的那样,那里有很多东西:
  • 该消息应包含路由信息
  • 该消息应包含与其相关的唯一流 ID。
  • 用于消息多路复用的单独处理器,其中错误也应该是消息
  • 每个 channel 都应该存储在某个地方,在这种情况下,我们有一个简单的用例,其中每条消息都可以提供一个 Flux消息或只是 Mono (在单声道的情况下,它可以在服务器端更简单地实现,因此您不必保留唯一的流 ID)。
  • 此示例不包括消息编码-解码,因此这个挑战留给您。

  • 客户端

    客户端也不是那么简单:

    处理 session

    为了处理连接,我们必须分配两个处理器,以便我们可以进一步使用它们来复用和解复用消息:
    UnicastProcessor<> outgoing = ...
    UnicastPorcessor<> incoming = ...
    (session) -> {
      return Flux.merge(
         session.receive()
                .subscribeWith(incoming)
                .then(Mono.empty()),
         session.send(outgoing)
      ).then();
    }
    

    将所有逻辑流保存在某处

    所有创建的流是否是MonoFlux应该存储在某个地方,以便我们能够区分与哪个流消息相关:
    Map<String, MonoSink> monoSinksMap = ...;
    Map<String, FluxSink> fluxSinksMap = ...;
    

    自 MonoSink 以来,我们必须保留两个映射,而 FluxSink 没有相同的父接口(interface)。

    消息路由

    在上面的示例中,我们只考虑了客户端的初始部分。现在我们要构建一个消息路由机制:
    ...
    .subscribeWith(incoming)
    .doOnNext(message -> {
        if (monoSinkMap.containsKey(message.getStreamId())) {
            MonoSink sink = monoSinkMap.get(message.getStreamId());
            monoSinkMap.remove(message.getStreamId());
            if (message.getType() == SUCCESS) {
                sink.success(message.getData());
            }
            else {
                sink.error(message.getCause());
            }
        } else if (fluxSinkMap.containsKey(message.getStreamId())) {
            FluxSink sink = fluxSinkMap.get(message.getStreamId());
            if (message.getType() == NEXT) {
                sink.next(message.getData());
            }
            else if (message.getType() == COMPLETE) {
                fluxSinkMap.remove(message.getStreamId());
                sink.next(message.getData());
                sink.complete();
            }
            else {
                fluxSinkMap.remove(message.getStreamId());
                sink.error(message.getCause());
            }
        }
    })
    

    上面的代码示例展示了我们如何路由传入的消息。

    多路请求

    最后一部分是消息复用。为此,我们将介绍可能的发送者类实现:
    class Sender {
        UnicastProcessor<> outgoing = ...
        UnicastPorcessor<> incoming = ...
    
        Map<String, MonoSink> monoSinksMap = ...;
        Map<String, FluxSink> fluxSinksMap = ...;
    
        public Sender () {
    

    //这里创建 websocket 连接并放前面提到的代码
    }
        Mono<R> sendForMono(T data) {
            //generate message with unique 
            return Mono.<R>create(sink -> {
                monoSinksMap.put(streamId, sink);
                outgoing.onNext(message); // send message to server only when subscribed to Mono
            });
        }
    
         Flux<R> sendForFlux(T data) {
             return Flux.<R>create(sink -> {
                fluxSinksMap.put(streamId, sink);
                outgoing.onNext(message); // send message to server only when subscribed to Flux
            });
         }
    }
    

    自定义实现总结
  • 铁杆
  • 没有实现背压支持,这可能是另一个挑战
  • 轻松射中自己的脚

  • 外卖
  • 请使用 RSocket,不要自己发明协议(protocol),这很难!!!
  • 要从 Pivotal 人员那里了解有关 RSocket 的更多信息 - https://www.youtube.com/watch?v=WVnAbv65uCU
  • 从我的一次演讲中了解有关 RSocket 的更多信息 - https://www.youtube.com/watch?v=XKMyj6arY2A
  • 有一个基于 RSocket 的特色框架,称为 Proteus - 您可能对此感兴趣 - https://www.netifi.com/
  • 从 RSocket 协议(protocol)核心开发者处了解更多关于 Proteus 的信息 - https://www.google.com/url?sa=t&source=web&rct=j&url=https://m.youtube.com/watch%3Fv%3D_rqQtkIeNIQ&ved=2ahUKEwjpyLTpsLzfAhXDDiwKHUUUA8gQt9IBMAR6BAgNEB8&usg=AOvVaw0B_VdOj42gjr0YrzLLUX1E
  • 关于java - Webflux websocketclient,如何在同一个 session 中发送多个请求[设计客户端库],我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53812515/

    有关java - Webflux websocketclient,如何在同一个 session 中发送多个请求[设计客户端库]的更多相关文章

    1. ruby - 如何在 Ruby 中顺序创建 PI - 2

      出于纯粹的兴趣,我很好奇如何按顺序创建PI,而不是在过程结果之后生成数字,而是让数字在过程本身生成时显示。如果是这种情况,那么数字可以自行产生,我可以对以前看到的数字实现垃圾收集,从而创建一个无限系列。结果只是在Pi系列之后每秒生成一个数字。这是我通过互联网筛选的结果:这是流行的计算机友好算法,类机器算法:defarccot(x,unity)xpow=unity/xn=1sign=1sum=0loopdoterm=xpow/nbreakifterm==0sum+=sign*(xpow/n)xpow/=x*xn+=2sign=-signendsumenddefcalc_pi(digits

    2. ruby - 如何在 buildr 项目中使用 Ruby 代码? - 2

      如何在buildr项目中使用Ruby?我在很多不同的项目中使用过Ruby、JRuby、Java和Clojure。我目前正在使用我的标准Ruby开发一个模拟应用程序,我想尝试使用Clojure后端(我确实喜欢功能代码)以及JRubygui和测试套件。我还可以看到在未来的不同项目中使用Scala作为后端。我想我要为我的项目尝试一下buildr(http://buildr.apache.org/),但我注意到buildr似乎没有设置为在项目中使用JRuby代码本身!这看起来有点傻,因为该工具旨在统一通用的JVM语言并且是在ruby中构建的。除了将输出的jar包含在一个独特的、仅限ruby​​

    3. ruby - 什么是填充的 Base64 编码字符串以及如何在 ruby​​ 中生成它们? - 2

      我正在使用的第三方API的文档状态:"[O]urAPIonlyacceptspaddedBase64encodedstrings."什么是“填充的Base64编码字符串”以及如何在Ruby中生成它们。下面的代码是我第一次尝试创建转换为Base64的JSON格式数据。xa=Base64.encode64(a.to_json) 最佳答案 他们说的padding其实就是Base64本身的一部分。它是末尾的“=”和“==”。Base64将3个字节的数据包编码为4个编码字符。所以如果你的输入数据有长度n和n%3=1=>"=="末尾用于填充n%

    4. ruby - 使用 Vim Rails,您可以创建一个新的迁移文件并一次性打开它吗? - 2

      使用带有Rails插件的vim,您可以创建一个迁移文件,然后一次性打开该文件吗?textmate也可以这样吗? 最佳答案 你可以使用rails.vim然后做类似的事情::Rgeneratemigratonadd_foo_to_bar插件将打开迁移生成的文件,这正是您想要的。我不能代表textmate。 关于ruby-使用VimRails,您可以创建一个新的迁移文件并一次性打开它吗?,我们在StackOverflow上找到一个类似的问题: https://sta

    5. ruby-on-rails - Rails - 一个 View 中的多个模型 - 2

      我需要从一个View访问多个模型。以前,我的links_controller仅用于提供以不同方式排序的链接资源。现在我想包括一个部分(我假设)显示按分数排序的顶级用户(@users=User.all.sort_by(&:score))我知道我可以将此代码插入每个链接操作并从View访问它,但这似乎不是“ruby方式”,我将需要在不久的将来访问更多模型。这可能会变得很脏,是否有针对这种情况的任何技术?注意事项:我认为我的应用程序正朝着单一格式和动态页面内容的方向发展,本质上是一个典型的网络应用程序。我知道before_filter但考虑到我希望应用程序进入的方向,这似乎很麻烦。最终从任何

    6. ruby-on-rails - 渲染另一个 Controller 的 View - 2

      我想要做的是有2个不同的Controller,client和test_client。客户端Controller已经构建,我想创建一个test_clientController,我可以使用它来玩弄客户端的UI并根据需要进行调整。我主要是想绕过我在客户端中内置的验证及其对加载数据的管理Controller的依赖。所以我希望test_clientController加载示例数据集,然后呈现客户端Controller的索引View,以便我可以调整客户端UI。就是这样。我在test_clients索引方法中试过这个:classTestClientdefindexrender:template=>

    7. ruby-on-rails - 如何在 ruby​​ 中使用两个参数异步运行 exe? - 2

      exe应该在我打开页面时运行。异步进程需要运行。有什么方法可以在ruby​​中使用两个参数异步运行exe吗?我已经尝试过ruby​​命令-system()、exec()但它正在等待过程完成。我需要用参数启动exe,无需等待进程完成是否有任何ruby​​gems会支持我的问题? 最佳答案 您可以使用Process.spawn和Process.wait2:pid=Process.spawn'your.exe','--option'#Later...pid,status=Process.wait2pid您的程序将作为解释器的子进程执行。除

    8. ruby - Highline 询问方法不会使用同一行 - 2

      设置:狂欢ruby1.9.2高线(1.6.13)描述:我已经相当习惯在其他一些项目中使用highline,但已经有几个月没有使用它了。现在,在Ruby1.9.2上全新安装时,它似乎不允许在同一行回答提示。所以以前我会看到类似的东西:require"highline/import"ask"Whatisyourfavoritecolor?"并得到:Whatisyourfavoritecolor?|现在我看到类似的东西:Whatisyourfavoritecolor?|竖线(|)符号是我的终端光标。知道为什么会发生这种变化吗? 最佳答案

    9. ruby - 如何在续集中重新加载表模式? - 2

      鉴于我有以下迁移:Sequel.migrationdoupdoalter_table:usersdoadd_column:is_admin,:default=>falseend#SequelrunsaDESCRIBEtablestatement,whenthemodelisloaded.#Atthispoint,itdoesnotknowthatusershaveais_adminflag.#Soitfails.@user=User.find(:email=>"admin@fancy-startup.example")@user.is_admin=true@user.save!ende

    10. ruby - 如何在 Ruby 中拆分参数字符串 Bash 样式? - 2

      我正在为一个项目制作一个简单的shell,我希望像在Bash中一样解析参数字符串。foobar"helloworld"fooz应该变成:["foo","bar","helloworld","fooz"]等等。到目前为止,我一直在使用CSV::parse_line,将列分隔符设置为""和.compact输出。问题是我现在必须选择是要支持单引号还是双引号。CSV不支持超过一个分隔符。Python有一个名为shlex的模块:>>>shlex.split("Test'helloworld'foo")['Test','helloworld','foo']>>>shlex.split('Test"

    随机推荐