如何有效地将入站netty.io bytebuf消息分发给通道组



我创建了一个netty.io bootstrap,该bootstrap从旧服务器接收流数据。服务器使用ISO-8859-1 Charset发送数据。还有一个内部"协议",它使用不同的分离器字节:

private static final byte GS = 29;  // FS ASCII char 1D (group separator)
private static final byte RS = 30;  // FS ASCII char 1E (record separator)
private static final byte US = 31;  // FS ASCII char 1F (unit separator)
private static final ByteProcessor GROUP_SEPARATOR_LOCATOR = value -> value != GS;
private static final ByteProcessor RECORD_SEPARATOR_LOCATOR = value -> value != RS;
private static final ByteProcessor UNIT_SEPARATOR_LOCATOR = value -> value != US;

这些副处理器实例用于拆分消息。最终将每个消息转换为相应的对象表示形式,而KeyValueMapping包含来自原始消息的主要内容:

public class Update {
    private final String id;    
    private final UpdateType updateType;
    private final Map<String, byte[]> keyValueMapping;
    // SOME OTHER STUFF
}

随后所有更新都转发给所有连接的Web插座客户端,这些端子由单独的ServerBootstrap处理:

public void distribute(ChannelGroup recipients, Object msg) {
    Update updateMsg = (Update) msg;
    recipients.writeAndFlush(updateMsg);
}

当我激活Java Flight记录并执行了一些负载测试时,我意识到主要分配热点是将初始入口消息转换为AM ISO-8859-1字节数组的方法:

private byte[] translateValue(ByteBuf in) {
    byte [] result;
    if (!in.hasArray()) {
        result = new byte[in.readableBytes()];
        in.getBytes(in.readerIndex(), result);
    } else {
        result = in.array();
    }
    return result;
}

最初,我没有翻译ByteBuf,而是将它们直接存储在更新的KeyValueMapping映射中。由于Bytebuf对象维护一些不受保护的内部索引(读者,作家,标记等) - 设计 - 我害怕只需将这些Bytebufs包装并转发到不同的频道(请参阅上面的收件人ChannelGroup),然后决定与此相关。BYTE []表示。

检查Java飞行记录结果,我想知道是否有任何建议如何将不变的入站数据分配给一组不同的渠道,而不会过多地接触GC?从结果中学习,直接缓冲区用于给定的频道,因为创建了许多新的字节阵列。

要提供更多上下文,我还添加了执行其余消息翻译的代码:

while (in.readableBytes() > 0) {
    ByteBuf keyAsByteBuf = nextToken(in, UNIT_SEPARATOR_LOCATOR);
    String key = translateKey(keyAsByteBuf);
    if (key != null) {
        ByteBuf valueAsByteBuf = nextToken(in, RECORD_SEPARATOR_LOCATOR);
        byte[] value = translateValue(valueAsByteBuf);
        if (value.length > 0) {
            mapping.put(key, value); 
        }
    }
}
private ByteBuf nextToken(ByteBuf in, ByteProcessor locator) {
    int separatorIdx = in.forEachByte(in.readerIndex(), in.readableBytes(), locator);
    if (separatorIdx >= 0) {
        ByteBuf token = in.readSlice(separatorIdx - in.readerIndex());
        in.skipBytes(1);
        return token;
    }
    return in.readSlice(in.readableBytes());
}
private String translateKey(ByteBuf in) {
    return keyTranslator.translate(in);
}

hm ...实际上,您的问题并不那么简单。我会尝试简短回答。

如果您在应用中不需要的话,则无需将ByteBuf转换为byte[]。所以我认为您有下一个结构:

public class Update {
    private final String id;    
    private final UpdateType updateType;
    private final Map<String, ByteBuf> keyValueMapping;
}

这里的问题是您部分解析ByteBuf。因此,您在此Java对象中具有Java对象 ByteBuf的s。

这完全可以,您可以进一步使用这些ByteBuf's操作。您的类Update应实现ReferenceCounted接口。因此,当您执行recipients.writeAndFlush(updateMsg)(假设收件人是DefaultChannelGroup)Netty DefaultChannelGroup将处理对那些缓冲区的参考。

所以发生了什么:

recipients.writeAndFlush(updateMsg)之后,循环中的DefaultChannelGroup将您的updateMsg通过channel.writeAndFlush(safeDuplicate(message))发送到列表中的每个通道。safeDuplicate是处理Netty ByteBuf的引用的特殊方法,因此您可以将相同的缓冲区发送到多个接收器(实际上它将使用retainedDuplicate()复制缓冲区)。但是,您的对象不是ByteBuf,而是Java对象。这是该方法的代码:

private static Object safeDuplicate(Object message) {
    if (message instanceof ByteBuf) {
        return ((ByteBuf) message).retainedDuplicate();
    } else if (message instanceof ByteBufHolder) {
        return ((ByteBufHolder) message).retainedDuplicate();
    } else {
        return ReferenceCountUtil.retain(message);
    }
}

因此,要正确处理ByteBuf的参考,您需要为ReferenceCountUtil.retain(message)实现ReferenceCounted。这样的东西:

public class Update implements ReferenceCounted {
    @Override
    public final Update retain() {
        return new Update(id, updateType, makeRetainedBuffers());
    }  
    private Map makeRetainedBuffers() {
       Map newMap = new HashMap();
       for (Entry entry : keyValueMapping) {
           newMap.put(entry.key, entry.value.duplicate().retain())
       }
       return newMap;
    }
}

这只是伪代码。但是你应该明白。您还必须在Update类中实现release()方法,并确保它始终释放其保留的缓冲区。并在内部释放所有缓冲区。我假设您已经在管道中为Update类中的编码器提供了调用release()

另一个选项是实现自己的DefaultChannelGroup。在这种情况下,您不必依靠safeDuplicate方法。因此,您无需实现ReferenceCounted,但是您仍然需要处理保留,在该类中手动释放。

最新更新