草庐IT

java - Netty channel 写入未到达处理程序

coder 2023-09-19 原文

我正在学习 Netty 并制作一个通过 TCP 发送对象的简单应用程序的原型(prototype)。我的问题是,当我使用消息从服务器端调用 Channel.write 时,它似乎没有到达管道中的处理程序。当我从客户端向服务器发送消息时,它按预期工作。

这是代码。

服务器:

public class Main {     
    private int serverPort;

    private EventLoopGroup bossGroup;
    private EventLoopGroup workerGroup;

    private ServerBootstrap boot;
    private ChannelFuture future;

    private SomeDataChannelDuplexHandler duplex;

    private Channel ch;

    public Main(int serverPort) {
        this.serverPort = serverPort;
    }

    public void initialise() {      
        boot = new ServerBootstrap();       
        bossGroup = new NioEventLoopGroup();
        workerGroup = new NioEventLoopGroup();

        boot.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast("idleStateHandler", new IdleStateHandler(0, 0, 2));

                    // Inbound
                    ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(65535, 0, 2, 0, 0));
                    ch.pipeline().addLast(new SomeDataDecoder());

                    // Outbound
                    ch.pipeline().addLast(new LengthFieldPrepender(2));
                    ch.pipeline().addLast(new SomeDataEncoder()); 

                    // In-Out
                    ch.pipeline().addLast(new SomeDataChannelDuplexHandler());
                }
            })      
            .option(ChannelOption.SO_BACKLOG, 128)
            .childOption(ChannelOption.SO_KEEPALIVE, true); 
    }

    public void sendMessage() { 
        SomeData fd = new SomeData("hello", "localhost", 1234);     
        ChannelFuture future = ch.writeAndFlush(fd);        
        future.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    System.out.println("send error: " + future.cause().toString());
                } else {
                    System.out.println("send message ok");  
                }
            }
        });
    }

    public void startServer(){
        try {
            future = boot.bind(serverPort)
                    .sync()
                    .addListener(new ChannelFutureListener() {
                        @Override
                        public void operationComplete(ChannelFuture future) throws Exception {
                            ch = future.channel();
                        }
            });
        } catch (InterruptedException e) {
            // log failure
        }
    }

    public void stopServer() {
        workerGroup.shutdownGracefully()
            .addListener(e -> System.out.println("workerGroup shutdown"));

        bossGroup.shutdownGracefully()
            .addListener(e -> System.out.println("bossGroup shutdown"));
    }

    public static void main(String[] args) throws InterruptedException {

        Main m = new Main(5000);

        m.initialise();
        m.startServer();

        final Scanner scanner = new Scanner(System.in);

        System.out.println("running.");

        while (true) {

            final String input = scanner.nextLine();

            if ("q".equals(input.trim())) {
                break;
            } else {
                m.sendMessage();
            }
        }

        scanner.close();
        m.stopServer();
    }
}

双工 channel 处理程序:

public class SomeDataChannelDuplexHandler extends ChannelDuplexHandler {

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        System.out.println("duplex channel active");
        ctx.fireChannelActive();
    }

    @Override 
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 
        System.out.println("duplex channelRead");
        if (msg instanceof SomeData) {
            SomeData sd = (SomeData) msg;
            System.out.println("received: " + sd);
        } else {
            System.out.println("some other object");
        }
        ctx.fireChannelRead(msg);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            if (event.state() == IdleState.ALL_IDLE) { // idle for no read and write
                System.out.println("idle: " + event.state());
            }
        }
    }   
}

最后是编码器(解码器类似):

public class SomeDataEncoder extends MessageToByteEncoder<SomeData> {

    @Override
    protected void encode(ChannelHandlerContext ctx, SomeData msg, ByteBuf out) throws Exception {

        System.out.println("in encoder, msg = " + msg);
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        ObjectOutputStream oos = new ObjectOutputStream(bos);

        oos.writeObject(msg.getName());
        oos.writeObject(msg.getIp());
        oos.writeInt(msg.getPort());
        oos.close();

        byte[] serialized = bos.toByteArray();
        int size = serialized.length;

        ByteBuf encoded = ctx.alloc().buffer(size);
        encoded.writeBytes(bos.toByteArray());

        out.writeBytes(encoded);
    }
}

客户端:

public class Client {

    String host = "10.188.36.66";
    int port = 5000;

    EventLoopGroup workerGroup = new NioEventLoopGroup();
    ChannelFuture f;
    private Channel ch;

    public Client() {
    }

    public void startClient() throws InterruptedException {
        Bootstrap boot = new Bootstrap();
        boot.group(workerGroup);
        boot.channel(NioSocketChannel.class);
        boot.option(ChannelOption.SO_KEEPALIVE, true);
        boot.handler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel ch) throws Exception {            
                // Inbound
                ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(65535, 0, 2, 0, 0));
                ch.pipeline().addLast(new SomeDataDecoder()); 

                // Outbound
                ch.pipeline().addLast(new LengthFieldPrepender(2));
                ch.pipeline().addLast(new SomeDataEncoder());

                // Handler
                ch.pipeline().addLast(new SomeDataHandler());
            }
        });

        // Start the client
        f = boot.connect(host, port).sync();
        f.addListener(new ChannelFutureListener() {
            public void operationComplete(ChannelFuture future) throws Exception {
                System.out.println("connected to server");
                ch = f.channel();
            }
        });
    }

    public void stopClient() {      
        workerGroup.shutdownGracefully();
    }

    private void writeMessage(String input) {
        SomeData data = new SomeData("client", "localhost", 3333);
        ChannelFuture fut = ch.writeAndFlush(data);
        fut.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                System.out.println("send message");
            }
        });
    }

    public static void main(String[] args) throws InterruptedException {
        Client client = new Client();
        client.startClient();        

        System.out.println("running.\n\n");
        final Scanner scanner = new Scanner(System.in);

        while (true) {

            final String input = scanner.nextLine();

            if ("q".equals(input.trim())) {
                break;
            } else {
                client.writeMessage(input);
            }   
        }   

        scanner.close();
        client.stopClient();  //call this at some point to shutdown the client
    }

}

和处理程序:

public class SomeDataHandler extends SimpleChannelInboundHandler<SomeData> {

    private ChannelHandlerContext ctx;

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        System.out.println("connected");
        this.ctx = ctx;
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, SomeData msg) throws Exception {
        System.out.println("got message: " + msg);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {   
        System.out.println("caught exception: " + cause.getMessage());
        ctx.close();
    }
}

当我通过服务器端的控制台发送消息时,我得到输出:

running.
duplex channel active
duplex read
idle: ALL_IDLE
idle: ALL_IDLE

send message ok

因此看起来好像消息已发送但客户端未收到任何内容。

当我从客户端执行此操作时(在服务器控制台上):

in decoder, numBytes in message = 31
duplex channelRead
received: SomeData [name=client, ip=localhost, port=3333]

这是我所期望的。

那么问题出在哪里呢?是否与在服务器端使用 ChannelDuplexHandler 和在客户端使用 SimpleChannelInboundHandler 有关?我需要调用什么来将消息踢下管道吗?

更新 我在服务器 sendMessage 方法中添加了对 future.isSuccess() 的检查,我得到了 在控制台发送错误:java.lang.UnsupportedOperationException

最佳答案

(代表 OP 发布)

对于任何感兴趣的人来说,问题是我试图在服务器 channel 而不是正常 channel 上发送消息。 This post为我指明了正确的方向。

关于java - Netty channel 写入未到达处理程序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42246482/

有关java - Netty channel 写入未到达处理程序的更多相关文章

  1. ruby - 在 Ruby 程序执行时阻止 Windows 7 PC 进入休眠状态 - 2

    我需要在客户计算机上运行Ruby应用程序。通常需要几天才能完成(复制大备份文件)。问题是如果启用sleep,它会中断应用程序。否则,计算机将持续运行数周,直到我下次访问为止。有什么方法可以防止执行期间休眠并让Windows在执行后休眠吗?欢迎任何疯狂的想法;-) 最佳答案 Here建议使用SetThreadExecutionStateWinAPI函数,使应用程序能够通知系统它正在使用中,从而防止系统在应用程序运行时进入休眠状态或关闭显示。像这样的东西:require'Win32API'ES_AWAYMODE_REQUIRED=0x0

  2. ruby - 如何指定 Rack 处理程序 - 2

    Rackup通过Rack的默认处理程序成功运行任何Rack应用程序。例如:classRackAppdefcall(environment)['200',{'Content-Type'=>'text/html'},["Helloworld"]]endendrunRackApp.new但是当最后一行更改为使用Rack的内置CGI处理程序时,rackup给出“NoMethodErrorat/undefinedmethod`call'fornil:NilClass”:Rack::Handler::CGI.runRackApp.newRack的其他内置处理程序也提出了同样的反对意见。例如Rack

  3. ruby - 在 Ruby 中编写命令行实用程序 - 2

    我想用ruby​​编写一个小的命令行实用程序并将其作为gem分发。我知道安装后,Guard、Sass和Thor等某些gem可以从命令行自行运行。为了让gem像二进制文件一样可用,我需要在我的gemspec中指定什么。 最佳答案 Gem::Specification.newdo|s|...s.executable='name_of_executable'...endhttp://docs.rubygems.org/read/chapter/20 关于ruby-在Ruby中编写命令行实用程序

  4. ruby-on-rails - Rails 应用程序之间的通信 - 2

    我构建了两个需要相互通信和发送文件的Rails应用程序。例如,一个Rails应用程序会发送请求以查看其他应用程序数据库中的表。然后另一个应用程序将呈现该表的json并将其发回。我还希望一个应用程序将存储在其公共(public)目录中的文本文件发送到另一个应用程序的公共(public)目录。我从来没有做过这样的事情,所以我什至不知道从哪里开始。任何帮助,将不胜感激。谢谢! 最佳答案 无论Rails是什么,几乎所有Web应用程序都有您的要求,大多数现代Web应用程序都需要相互通信。但是有一个小小的理解需要你坚持下去,网站不应直接访问彼此

  5. ruby - 无法运行 Rails 2.x 应用程序 - 2

    我尝试运行2.x应用程序。我使用rvm并为此应用程序设置其他版本的ruby​​:$rvmuseree-1.8.7-head我尝试运行服务器,然后出现很多错误:$script/serverNOTE:Gem.source_indexisdeprecated,useSpecification.Itwillberemovedonorafter2011-11-01.Gem.source_indexcalledfrom/Users/serg/rails_projects_terminal/work_proj/spohelp/config/../vendor/rails/railties/lib/r

  6. ruby-on-rails - Rails 应用程序中的 Rails : How are you using application_controller. rb 是新手吗? - 2

    刚入门rails,开始慢慢理解。有人可以解释或给我一些关于在application_controller中编码的好处或时间和原因的想法吗?有哪些用例。您如何为Rails应用程序使用应用程序Controller?我不想在那里放太多代码,因为据我了解,每个请求都会调用此Controller。这是真的? 最佳答案 ApplicationController实际上是您应用程序中的每个其他Controller都将从中继承的类(尽管这不是强制性的)。我同意不要用太多代码弄乱它并保持干净整洁的态度,尽管在某些情况下ApplicationContr

  7. Ruby 写入和读取对象到文件 - 2

    好的,所以我的目标是轻松地将一些数据保存到磁盘以备后用。您如何简单地写入然后读取一个对象?所以如果我有一个简单的类classCattr_accessor:a,:bdefinitialize(a,b)@a,@b=a,bendend所以如果我从中非常快地制作一个objobj=C.new("foo","bar")#justgaveitsomerandomvalues然后我可以把它变成一个kindaidstring=obj.to_s#whichreturns""我终于可以将此字符串打印到文件或其他内容中。我的问题是,我该如何再次将这个id变回一个对象?我知道我可以自己挑选信息并制作一个接受该信

  8. java - 等价于 Java 中的 Ruby Hash - 2

    我真的很习惯使用Ruby编写以下代码:my_hash={}my_hash['test']=1Java中对应的数据结构是什么? 最佳答案 HashMapmap=newHashMap();map.put("test",1);我假设? 关于java-等价于Java中的RubyHash,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/22737685/

  9. ruby-on-rails - 如何在我的 Rails 应用程序 View 中打印 ruby​​ 变量的内容? - 2

    我是一个Rails初学者,但我想从我的RailsView(html.haml文件)中查看Ruby变量的内容。我试图在ruby​​中打印出变量(认为它会在终端中出现),但没有得到任何结果。有什么建议吗?我知道Rails调试器,但更喜欢使用inspect来打印我的变量。 最佳答案 您可以在View中使用puts方法将信息输出到服务器控制台。您应该能够在View中的任何位置使用Haml执行以下操作:-puts@my_variable.inspect 关于ruby-on-rails-如何在我的R

  10. java - 从 JRuby 调用 Java 类的问题 - 2

    我正在尝试使用boilerpipe来自JRuby。我看过guide从JRuby调用Java,并成功地将它与另一个Java包一起使用,但无法弄清楚为什么同样的东西不能用于boilerpipe。我正在尝试基本上从JRuby中执行与此Java等效的操作:URLurl=newURL("http://www.example.com/some-location/index.html");Stringtext=ArticleExtractor.INSTANCE.getText(url);在JRuby中试过这个:require'java'url=java.net.URL.new("http://www

随机推荐