我正在使用TPL数据流块来实现基于数据包的网络协议。此协议是固定的,我无法更改。大多数情况下会有一些小包,但很多包。
我有一个连接到服务器并读取原始数据包的客户端组件。然后将这些原始数据包作为 MemoryStreams 发布到 BufferBlock 中,然后在 TransformBlock 中根据数据包类型/ID 解码为数据包结构。
若要发送数据包,此过程与另一个数据流块链相反。据我所知,所有这些都运作良好。
问题在于,根据服务器响应,这些数据包可能会也可能不会被压缩,并且可能会也可能不会加密。我会用中间的新转换块来解决这个问题(以解压缩为例):
static TransformBlock<Stream, Stream> CreateDecompressorBlock(ProtocolContext context)
{
return new TransformBlock<Stream, Stream>(stream =>
{
if (!context.CompressionEnabled) return stream;
return new DeflateStream(stream, CompressionMode.Decompress);
}
}
然而,在我看来,这不是正确的方法。据我了解,DeflateStream(和CryptoStream)在读取数据时对其进行解码。这意味着数据是在我解码数据包内容时解码的,而不是在 TransformBlock 本身内部解码的,在 TransformBlock 本身中只创建包装流。这似乎可以绕过数据流块的优势。
因此,我想到了另一个解决方案,我没有返回DeflateStream/CryptoStream,而是将其读取到另一个MemoryStream中:
static TransformBlock<Stream, Stream> CreateDecompressorBlock(ProtocolContext context)
{
return new TransformBlock<Stream, Stream>(stream =>
{
if (!context.CompressionEnabled) return stream;
using (var deflate = new DeflateStream(stream, CompressionMode.Decompress))
{
var ms = new MemoryStream();
deflate.CopyTo(ms);
return ms;
}
}
}
现在看来,这似乎是浪费内存。
所以我的问题是,仅仅包装流并让稍后解码数据包内容的 TransformBlock 完成工作就足够了,还是我应该使用更多的内存,然后可能有更好的分离和并行性?(虽然我不认为解码会成为瓶颈,但这将是网络)。
或者是否有一种模式可以与 TPL 数据流一起使用,可以更好地解决我的问题?
与往常一样,这是一种权衡,只能由您做出决定。
我会选择最简单的解决方案,即连接两个块,让写入块"吸收"增加的压缩复杂性,因为TDF块可以在需要时增加其并行性。但是,我只会在该块没有有限的并行性(MaxDegreeOfParallelism
)时才这样做。
如果有限制,那么我会按照您的描述处理压缩块中的实际压缩。这种行为可以在非常高的并行度下完成,这些内存缓冲区似乎并不是一个大问题。
如果是,则可以添加缓冲池,因此缓冲区计数将限制为块的MaxDegreeOfParallelism
,并且只需在应用程序的生存期内分配一次这些缓冲区。