在TPL Dataflow TransformBlock中使用DeflateStream和/或CryptoStream

本文关键字:DeflateStream CryptoStream TPL Dataflow TransformBlock | 更新日期: 2023-09-27 18:36:46

我正在使用TPL数据流块来实现基于数据包的网络协议。此协议是固定的,我无法更改。大多数情况下会有一些小包,但很多包。

我有一个连接到服务器并读取原始数据包的客户端组件。然后将这些原始数据包作为 MemoryStreams 发布到 BufferBlock 中,然后在 TransformBlock 中根据数据包类型/ID 解码为数据包结构。

若要发送数据包,此过程与另一个数据流块链相反。据我所知,所有这些都运作良好。

问题在于,根据服务器响应,这些数据包可能会也可能不会被压缩,并且可能会也可能不会加密。我会用中间的新转换块来解决这个问题(以解压缩为例):

static TransformBlock<Stream, Stream> CreateDecompressorBlock(ProtocolContext context)
{
    return new TransformBlock<Stream, Stream>(stream =>
    {
        if (!context.CompressionEnabled) return stream;
        return new DeflateStream(stream, CompressionMode.Decompress);
    }
}

然而,在我看来,这不是正确的方法。据我了解,DeflateStream(和CryptoStream)在读取数据时对其进行解码。这意味着数据是在我解码数据包内容时解码的,而不是在 TransformBlock 本身内部解码的,在 TransformBlock 本身中只创建包装流。这似乎可以绕过数据流块的优势。

因此,我想到了另一个解决方案,我没有返回DeflateStream/CryptoStream,而是将其读取到另一个MemoryStream中:

static TransformBlock<Stream, Stream> CreateDecompressorBlock(ProtocolContext context)
{
    return new TransformBlock<Stream, Stream>(stream =>
    {
        if (!context.CompressionEnabled) return stream;
        using (var deflate = new DeflateStream(stream, CompressionMode.Decompress))
        {
            var ms = new MemoryStream();
            deflate.CopyTo(ms);
            return ms;
        }
    }
}

现在看来,这似乎是浪费内存。

所以我的问题是,仅仅包装流并让稍后解码数据包内容的 TransformBlock 完成工作就足够了,还是我应该使用更多的内存,然后可能有更好的分离和并行性?(虽然我不认为解码会成为瓶颈,但这将是网络)。

或者是否有一种模式可以与 TPL 数据流一起使用,可以更好地解决我的问题?

在TPL Dataflow TransformBlock中使用DeflateStream和/或CryptoStream

与往常一样,这是一种权衡,只能由您做出决定。

我会选择最简单的解决方案,即连接两个块,让写入块"吸收"增加的压缩复杂性,因为TDF块可以在需要时增加其并行性。但是,我只会在该块没有有限的并行性(MaxDegreeOfParallelism)时才这样做。

如果有限制,那么我会按照您的描述处理压缩块中的实际压缩。这种行为可以在非常高的并行度下完成,这些内存缓冲区似乎并不是一个大问题。

如果是,则可以添加缓冲池,因此缓冲区计数将限制为块的MaxDegreeOfParallelism,并且只需在应用程序的生存期内分配一次这些缓冲区。