草庐IT

java - Netty Nio 中 promise 的异步更新

coder 2024-03-03 原文

我有一个交换信息的服务器和客户端架构。我想从服务器返回已连接 channel 的数量。我想使用 promise 将服务器的消息返回给客户端。我的代码是:

public static void callBack () throws Exception{

   String host = "localhost";
   int port = 8080;

   try {
       Bootstrap b = new Bootstrap();
       b.group(workerGroup);
       b.channel(NioSocketChannel.class);
       b.option(ChannelOption.SO_KEEPALIVE, true);
       b.handler(new ChannelInitializer<SocketChannel>() {
        @Override
           public void initChannel(SocketChannel ch) throws Exception {
            ch.pipeline().addLast(new RequestDataEncoder(), new ResponseDataDecoder(), new ClientHandler(promise));
           }
       });
       ChannelFuture f = b.connect(host, port).sync();
       //f.channel().closeFuture().sync();
   }
   finally {
    //workerGroup.shutdownGracefully();
   }
}

public static void main(String[] args) throws Exception {

  callBack();
  while (true) {

    Object msg = promise.get();
    System.out.println("The number if the connected clients is not two");
    int ret = Integer.parseInt(msg.toString());
    if (ret == 2){
        break;
    }
  }
  System.out.println("The number if the connected clients is two");
}

当我运行一个客户端时,它总是收到消息 The number if the connected clients is not two 并且返回的数字始终是一个。当我运行第二个客户端时,它总是接收到返回值 2,但是,第一个客户端仍然接收到一个。对于第一个客户的情况,我找不到更新 promise 的正确方法。

编辑: 客户端服务器:

public class ClientHandler extends ChannelInboundHandlerAdapter {
  public final Promise<Object> promise;
  public ClientHandler(Promise<Object> promise) {
      this.promise = promise;
  }

  @Override
  public void channelActive(ChannelHandlerContext ctx) throws Exception {
      RequestData msg = new RequestData();
      msg.setIntValue(123);
      msg.setStringValue("all work and no play makes jack a dull boy");
      ctx.writeAndFlush(msg);
  }

  @Override
  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
      System.out.println(msg);
      promise.trySuccess(msg);
  }
} 

客户端处理程序的代码将从服务器接收到的消息存储到 promise。

最佳答案

使用 Netty 框架,一个 Promise和一个 Future是一次写入的对象,这个原则使它们更容易在多线程环境中使用。

既然一个Promise不能做你想要的,我们需要看看其他技术是否适合你的条件,你的条件基本上归结为:

  • 从多个线程读取
  • 仅从单个线程写入(因为在 Netty channel 内,read 方法只能同时由 1 个线程执行,除非 channel 被标记为可共享)

对于这些要求,最合适的匹配是 volatile 变量,因为它对于读取是线程安全的,并且可以由 1 个线程安全地更新,而不必担心写入顺序。

要更新您的代码以使用 volatile 变量,需要进行一些修改,因为我们无法轻松地将引用链接传递到函数内的变量,但我们必须传递一个更新后端变量的函数。

private static volatile int connectedClients = 0;
public static void callBack () throws Exception{
    //....
           ch.pipeline().addLast(new RequestDataEncoder(), new ResponseDataDecoder(),
                                 new ClientHandler(i -> {connectedClients = i;});
    //....
}

public static void main(String[] args) throws Exception {

  callBack();
  while (true) {
    System.out.println("The number if the connected clients is not two");
    int ret = connectedClients;
    if (ret == 2){
        break;
    }
  }
  System.out.println("The number if the connected clients is two");
}

public class ClientHandler extends ChannelInboundHandlerAdapter {
  public final IntConsumer update;
  public ClientHandler(IntConsumer update) {
      this.update = update;
  }

  @Override
  public void channelActive(ChannelHandlerContext ctx) throws Exception {
      RequestData msg = new RequestData();
      msg.setIntValue(123);
      msg.setStringValue("all work and no play makes jack a dull boy");
      ctx.writeAndFlush(msg);
  }

  @Override
  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
      System.out.println(msg);
      update.accept(Integer.parseInt(msg));
  }
} 

虽然上面的方法应该可行,但我们很快发现主类中的 while 循环占用了大量 CPU 时间,这可能会影响本地客户端系统的其他部分,幸运的是,如果我们这样做,这个问题也是可以解决的向系统中添加其他部分,即同步。通过将 connectedClients 的初始读取留在同步块(synchronized block)之外,我们仍然可以在“真”情况下从快速读取中获益,而在“假”情况下,我们可以可用于系统其他部分的安全重要 CPU 周期。

为了解决这个问题,我们在阅读时使用以下步骤:

  1. connectedClients 的值存储在一个单独的变量中
  2. 将这个变量与目标值进行比较
  3. 如果为真,则尽早跳出循环
  4. 如果为 false,则进入同步块(synchronized block)
  5. 开始 while true 循环
  6. 再次读出变量,因为现在值可能会改变
  7. 检查条件,如果条件正确则中断
  8. 如果不是,等待值的变化

以及写作时的以下内容:

  1. 同步
  2. 更新值
  3. 唤醒所有其他等待这个值的线程

这可以用如下代码实现:

private static volatile int connectedClients = 0;
private static final Object lock = new Object();
public static void callBack () throws Exception{
    //....
           ch.pipeline().addLast(new RequestDataEncoder(), new ResponseDataDecoder(),
                                 new ClientHandler(i -> {
               synchronized (lock) {
                   connectedClients = i;
                   lock.notifyAll();
               }
           });
    //....
}

public static void main(String[] args) throws Exception {

  callBack();
  int connected = connectedClients;
  if (connected != 2) {
      System.out.println("The number if the connected clients is not two before locking");
      synchronized (lock) {
          while (true) {
              connected = connectedClients;
              if (connected == 2)
                  break;
              System.out.println("The number if the connected clients is not two");
              lock.wait();
          }
      }
  }
  System.out.println("The number if the connected clients is two: " + connected );
}

服务器端变化

但是,并非所有问题都与客户端有关。

因为您发布了指向您的 github 存储库的链接,所以当新人加入时,您永远不会将请求从服务器发送回旧客户端。因为没有这样做,所以永远不会通知客户有关更改,请确保也这样做。

关于java - Netty Nio 中 promise 的异步更新,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47015728/

有关java - Netty Nio 中 promise 的异步更新的更多相关文章

  1. ruby-on-rails - 如何验证 update_all 是否实际在 Rails 中更新 - 2

    给定这段代码defcreate@upgrades=User.update_all(["role=?","upgraded"],:id=>params[:upgrade])redirect_toadmin_upgrades_path,:notice=>"Successfullyupgradeduser."end我如何在该操作中实际验证它们是否已保存或未重定向到适当的页面和消息? 最佳答案 在Rails3中,update_all不返回任何有意义的信息,除了已更新的记录数(这可能取决于您的DBMS是否返回该信息)。http://ar.ru

  2. ruby-on-rails - 如何在 ruby​​ 中使用两个参数异步运行 exe? - 2

    exe应该在我打开页面时运行。异步进程需要运行。有什么方法可以在ruby​​中使用两个参数异步运行exe吗?我已经尝试过ruby​​命令-system()、exec()但它正在等待过程完成。我需要用参数启动exe,无需等待进程完成是否有任何ruby​​gems会支持我的问题? 最佳答案 您可以使用Process.spawn和Process.wait2:pid=Process.spawn'your.exe','--option'#Later...pid,status=Process.wait2pid您的程序将作为解释器的子进程执行。除

  3. ruby-on-rails - 使用 rails 4 设计而不更新用户 - 2

    我将应用程序升级到Rails4,一切正常。我可以登录并转到我的编辑页面。也更新了观点。使用标准View时,用户会更新。但是当我添加例如字段:name时,它​​不会在表单中更新。使用devise3.1.1和gem'protected_attributes'我需要在设备或数据库上运行某种更新命令吗?我也搜索过这个地方,找到了许多不同的解决方案,但没有一个会更新我的用户字段。我没有添加任何自定义字段。 最佳答案 如果您想允许额外的参数,您可以在ApplicationController中使用beforefilter,因为Rails4将参数

  4. java - 等价于 Java 中的 Ruby Hash - 2

    我真的很习惯使用Ruby编写以下代码:my_hash={}my_hash['test']=1Java中对应的数据结构是什么? 最佳答案 HashMapmap=newHashMap();map.put("test",1);我假设? 关于java-等价于Java中的RubyHash,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/22737685/

  5. java - 从 JRuby 调用 Java 类的问题 - 2

    我正在尝试使用boilerpipe来自JRuby。我看过guide从JRuby调用Java,并成功地将它与另一个Java包一起使用,但无法弄清楚为什么同样的东西不能用于boilerpipe。我正在尝试基本上从JRuby中执行与此Java等效的操作:URLurl=newURL("http://www.example.com/some-location/index.html");Stringtext=ArticleExtractor.INSTANCE.getText(url);在JRuby中试过这个:require'java'url=java.net.URL.new("http://www

  6. java - 我的模型类或其他类中应该有逻辑吗 - 2

    我只想对我一直在思考的这个问题有其他意见,例如我有classuser_controller和classuserclassUserattr_accessor:name,:usernameendclassUserController//dosomethingaboutanythingaboutusersend问题是我的User类中是否应该有逻辑user=User.newuser.do_something(user1)oritshouldbeuser_controller=UserController.newuser_controller.do_something(user1,user2)我

  7. java - 什么相当于 ruby​​ 的 rack 或 python 的 Java wsgi? - 2

    什么是ruby​​的rack或python的Java的wsgi?还有一个路由库。 最佳答案 来自Python标准PEP333:Bycontrast,althoughJavahasjustasmanywebapplicationframeworksavailable,Java's"servlet"APImakesitpossibleforapplicationswrittenwithanyJavawebapplicationframeworktoruninanywebserverthatsupportstheservletAPI.ht

  8. Observability:从零开始创建 Java 微服务并监控它 (二) - 2

    这篇文章是继上一篇文章“Observability:从零开始创建Java微服务并监控它(一)”的续篇。在上一篇文章中,我们讲述了如何创建一个Javaweb应用,并使用Filebeat来收集应用所生成的日志。在今天的文章中,我来详述如何收集应用的指标,使用APM来监控应用并监督web服务的在线情况。源码可以在地址 https://github.com/liu-xiao-guo/java_observability 进行下载。摄入指标指标被视为可以随时更改的时间点值。当前请求的数量可以改变任何毫秒。你可能有1000个请求的峰值,然后一切都回到一个请求。这也意味着这些指标可能不准确,你还想提取最小/

  9. 【Java 面试合集】HashMap中为什么引入红黑树,而不是AVL树呢 - 2

    HashMap中为什么引入红黑树,而不是AVL树呢1.概述开始学习这个知识点之前我们需要知道,在JDK1.8以及之前,针对HashMap有什么不同。JDK1.7的时候,HashMap的底层实现是数组+链表JDK1.8的时候,HashMap的底层实现是数组+链表+红黑树我们要思考一个问题,为什么要从链表转为红黑树呢。首先先让我们了解下链表有什么不好???2.链表上述的截图其实就是链表的结构,我们来看下链表的增删改查的时间复杂度增:因为链表不是线性结构,所以每次添加的时候,只需要移动一个节点,所以可以理解为复杂度是N(1)删:算法时间复杂度跟增保持一致查:既然是非线性结构,所以查询某一个节点的时候

  10. 【Java入门】使用Java实现文件夹的遍历 - 2

    遍历文件夹我们通常是使用递归进行操作,这种方式比较简单,也比较容易理解。本文为大家介绍另一种不使用递归的方式,由于没有使用递归,只用到了循环和集合,所以效率更高一些!一、使用递归遍历文件夹整体思路1、使用File封装初始目录,2、打印这个目录3、获取这个目录下所有的子文件和子目录的数组。4、遍历这个数组,取出每个File对象4-1、如果File是否是一个文件,打印4-2、否则就是一个目录,递归调用代码实现publicclassSearchFile{publicstaticvoidmain(String[]args){//初始目录Filedir=newFile("d:/Dev");Datebeg

随机推荐