我仍在努力让Camel(2.16.1)和Netty(4.0.33)都能接收到自由选择长度的tcp内容。由于收到的tcp内容大小未知,我还无法为创建一个工作解码器。
让我举例说明我的问题。假设我有一个长度为3129字节的文件。当我将该文件nc
发送到我的路由时,直到读取最后一个字节才知道大小:
cat file.bin | nc localhost 10001
我的路线定义如下:
from( "netty4:tcp://127.0.0.1:10001?sync=false&allowDefaultCodec=false&
decoder=#factory&receiveBufferSize=1000000")
.to("file:/temp/in");
工厂看起来是这样的,因为我需要确保每个ChannelHandler只使用一次:
public class Factory implements ChannelHandlerFactory {
@Override
public ChannelHandler newChannelHandler() {
return new RawPrinterDecoder();
}
}
在我的解码器中,我有这样的代码:
public class RawPrinterDecoder extends ReplayingDecoder<Void> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in,
List<Object> out) throws Exception {
while (in.isReadable()) {
byte readByte = in.readByte();
job.addContent(readByte);
}
in.discardReadBytes();
}
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("Bytes in job: " + job.getSize() );
}
}
这样做的问题是,我接收的不是3129字节,而是9273字节。原因是该文件被拆分为1024字节的3个段和57字节的1个段。这些片段被反复传递到我的解码器,尽管我试图在第一次用in.discardReadBytes()
处理后使片段无效,但它们会被再次处理,所以。。。
- 分段1
- 分段2
- 分段3
- 分段4
我的解码器看到它们像这个
- 分段1
- segment1+segment2
- segment1+segment2+segment3
- segment1+segment2+segment3+segment4
我试图通过使用checkpoint()
来解决我的问题,但这些段仍然被反复调用。
如何确保每个片段只处理一次并按正确顺序处理?如果这可以更有效地完成,而不是读取单个字节,那么欢迎推荐(readableBytes()
总是返回2GB,所以我不能用它来获取字节数)。
由于您的服务器正在使用ByteBuffAllocator,它被分成了多个段。你可以这样改变:
@Bean
public ChannelInitializer<SocketChannel> channelInitializer() {
return new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.config().setRecvByteBufAllocator(new FixedRecvByteBufAllocator(2048));
}
};
}
您可以使用一次读取所有可用的字节
ByteBuf buffer = in.readBytes(in);
或
ByteBuf buffer = in.readSlice(in.readableBytes());