Netty Handler not called



我正在尝试使用一个简单的服务器客户端应用程序进入Netty(代码见下文)。

我正在为两个问题而苦苦挣扎:

  1. ConfigServerHandler resp. ConfigClientHandler 被正确调用。但是 FeedbackServerHandler 需要。从不调用 FeedbackClientHandler。为什么?根据文档,应一个接一个地调用处理程序。

  2. 我想有几个处理程序。这些处理程序中的每一个都只对另一端发送的一些消息感兴趣(例如,由客户端发送,由服务器接收)。

    • 我是否应该在处理程序收到消息后过滤消息 (channelRead)?如何区分不同的字符串?对于不同的对象,通过解析它们应该很容易。
    • 是否可以为套接字通道定义不同的通道管道?
    • 进一步的方法?

感谢您的帮助!

凯杰

以下是服务器的创建方式:

public void run() throws Exception {
    EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    try {
        ServerBootstrap b = new ServerBootstrap();
        b.group(bossGroup, workerGroup)
         .channel(NioServerSocketChannel.class)
         .handler(new LoggingHandler(LogLevel.INFO))
         .childHandler(new ChannelInitializer<SocketChannel>() {
             @Override
             public void initChannel(SocketChannel ch) throws Exception {
                 ChannelPipeline p = ch.pipeline();
                 p.addLast(
                     new ObjectEncoder(),
                     new ObjectDecoder(ClassResolvers.cacheDisabled(null)),
                     new ConfigServerHandler(),
                     new FeedbackServerHandler());
             }
         });
      b.bind(mPort).sync().channel().closeFuture().sync();
    } finally {
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }
}

其中一个处理程序类(FeedbackServerHandler 执行完全相同的操作,但解析为整数):

public class ConfigServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        System.out.println("ConfigServerHandler::channelRead, " +(String)msg);
        ctx.write(msg);
    }
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        ctx.flush();
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
   }
}

客户端看起来非常相似:

public Client(String host, int port) throws InterruptedException {
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    try {
        Bootstrap b = new Bootstrap();
        b.group(workerGroup)
         .channel(NioSocketChannel.class)
         .handler(new ChannelInitializer<SocketChannel>() {
             @Override
             public void initChannel(SocketChannel ch) throws Exception {
                 ChannelPipeline p = ch.pipeline();
                  p.addLast(
                      new ObjectEncoder(),
                      new ObjectDecoder(ClassResolvers.cacheDisabled(null)),
                      new ConfigClientHandler(),
                      new FeedbackClientHandler());
             }
         });
         b.connect(host, port).sync().channel().closeFuture().sync();
    } finally {
        workerGroup.shutdownGracefully();
    }
}

下面是客户端处理程序之一(另一个发送整数消息并在"channelRead"方法中解析为整数):

public class ConfigClientHandler extends ChannelInboundHandlerAdapter {
    private final String firstMessage = "blubber";
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        System.out.println("ConfigClientHandler::channelActive");
        ctx.writeAndFlush(firstMessage);
    }
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        System.out.println("ConfigClientHandler::channelRead, " +(String)msg);
        ctx.write(msg);
    }
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        ctx.flush();
    }

}

您正在使用ChannelInboundHandlerAdapter,这很好,用于您的"中间"处理程序ConfigXxxxxHandler

但是你使用channelRead方法,然后在ctx.write(msg)内部使用。 ctx.write(msg)首先通过上一个处理程序(ObjectDecoder)将味精写回另一个服务器,而不是下一个处理程序(在您的情况下FeedbackClientHandler)。

如果要将消息发送到下一个处理程序,则应使用以下命令:

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    System.out.println("ConfigClientHandler::channelRead, " +(String)msg);
    ctx.fireChannelRead(msg);
}

当然,channelReadComplete中没有ctx.flush()(因为不再在那里写)。但是在你的最后FeedbackClientHandler,当然,使用带有ctx.write(yourNewMessage)的刷新方法或使用ctx.writeAndFlush(yourNewMessage)

所以要恢复:

  • ctx.write会将消息发送到线路,因此将前一个处理程序发送到通道,然后发送到网络,因此出站方式
  • ctx.fireChannelRead会将消息发送到下一个后续处理程序(相反的方式),因此入站方式

有关详细信息,请参阅 http://netty.io/wiki/new-and-noteworthy-in-4.0.html#wiki-h4-16。

您可能还应该反转编码器/解码器,因为一般来说,最好先使用解码器,然后在管道中使用编码器。

            p.addLast(
                      new ObjectDecoder(ClassResolvers.cacheDisabled(null)),
                      new ObjectEncoder(),
                      new ConfigClientHandler(),
                      new FeedbackClientHandler());

最新更新