ChannelHandler正在处理未知大小的内容



我仍在努力让Camel(2.16.1)和Netty(4.0.33)都能接收到自由选择长度的tcp内容。由于收到的tcp内容大小未知,我还无法为创建一个工作解码器。

让我举例说明我的问题。假设我有一个长度为3129字节的文件。当我将该文件nc发送到我的路由时,直到读取最后一个字节才知道大小:

cat file.bin | nc localhost 10001

我的路线定义如下:

from( "netty4:tcp://127.0.0.1:10001?sync=false&allowDefaultCodec=false&
      decoder=#factory&receiveBufferSize=1000000")
.to("file:/temp/in");

工厂看起来是这样的,因为我需要确保每个ChannelHandler只使用一次:

public class Factory implements ChannelHandlerFactory {
    @Override
    public ChannelHandler newChannelHandler() {
        return new RawPrinterDecoder();
    }
}

在我的解码器中,我有这样的代码:

public class RawPrinterDecoder extends ReplayingDecoder<Void> {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in,
            List<Object> out) throws Exception {
        while (in.isReadable()) {
            byte readByte = in.readByte();
            job.addContent(readByte);
        }
        in.discardReadBytes();
    }
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("Bytes in job: " + job.getSize() );
    }
}

这样做的问题是,我接收的不是3129字节,而是9273字节。原因是该文件被拆分为1024字节的3个段和57字节的1个段。这些片段被反复传递到我的解码器,尽管我试图在第一次用in.discardReadBytes()处理后使片段无效,但它们会被再次处理,所以。。。

  • 分段1
  • 分段2
  • 分段3
  • 分段4

我的解码器看到它们像这个

  • 分段1
  • segment1+segment2
  • segment1+segment2+segment3
  • segment1+segment2+segment3+segment4

我试图通过使用checkpoint()来解决我的问题,但这些段仍然被反复调用。

如何确保每个片段只处理一次并按正确顺序处理?如果这可以更有效地完成,而不是读取单个字节,那么欢迎推荐(readableBytes()总是返回2GB,所以我不能用它来获取字节数)。

由于您的服务器正在使用ByteBuffAllocator,它被分成了多个段。你可以这样改变:

@Bean
public ChannelInitializer<SocketChannel> channelInitializer() {
    return new ChannelInitializer<SocketChannel>() {
        @Override
        public void initChannel(SocketChannel ch) throws Exception {
            ch.config().setRecvByteBufAllocator(new FixedRecvByteBufAllocator(2048));
        }
    };
}

您可以使用一次读取所有可用的字节

ByteBuf buffer = in.readBytes(in);

ByteBuf buffer = in.readSlice(in.readableBytes());

最新更新