正在缓冲C#中的字节数据

本文关键字:字节 字节数 数据 缓冲 | 更新日期: 2023-09-27 18:06:02

我的应用程序从TCP套接字读取字节,并需要对它们进行缓冲,以便以后可以从中提取消息。由于TCP的性质,我可能在一次读取中获得部分或多条消息,因此在每次读取后,我希望检查缓冲区并提取尽可能多的完整消息。

因此,我想要一个允许我做以下事情的类:

  • 向其附加任意字节[]数据
  • 在不消耗内容的情况下检查内容,特别是检查内容的数量,并搜索某个或多个字节的存在
  • 提取并使用部分数据作为字节[],而将其余数据保留在那里以备将来读取

我希望我想要的可以用.NET库中的一个或多个现有类来完成,但我不确定是哪一个System.IO.MemoryStream看起来很接近我想要的,但(a(不清楚它是否适合用作缓冲区(读取的数据是否会从容量中删除?(和(b(读取和写入似乎发生在同一个位置-"流的当前位置是下一次读取或写入操作可能发生的位置。">-这不是我想要的。我需要写到最后,从前面开始阅读。

正在缓冲C#中的字节数据

我建议您在后台使用MemoryStream,但将其封装在另一个存储的类中

  • MemoryStream
  • 当前"读取"位置
  • 当前"消耗"位置

然后它将暴露:

  • 写入:将流的位置设置为末尾,写入数据,将流的位置回读取位置
  • 读取:读取数据,将读取位置设置为流的位置
  • 消费:更新已消费的头寸(根据您尝试消费的方式进行详细说明(;如果消耗位置高于某个阈值,则将现有缓冲的数据复制到新的MemoryStream中,并更新所有变量。(您可能不想在每次消费请求时复制上的缓冲区。(

请注意,如果没有额外的同步,这些都不会是线程安全的。

只需使用一个大字节数组和array.Copy-它就可以了。如果没有,请使用List<byte>

如果您使用数组,您必须自己实现对它的索引(复制额外数据的地方((检查内容大小也是如此(,但这很简单。

如果您感兴趣:这里有一个"循环缓冲区"的简单实现。测试应该运行(我对它进行了几个单元测试,但它没有检查所有关键路径(:

public class ReadWriteBuffer
{
    private readonly byte[] _buffer;
    private int _startIndex, _endIndex;
    public ReadWriteBuffer(int capacity)
    {
        _buffer = new byte[capacity];
    }
    public int Count
    {
        get
        {
            if (_endIndex > _startIndex)
                return _endIndex - _startIndex;
            if (_endIndex < _startIndex)
                return (_buffer.Length - _startIndex) + _endIndex;
            return 0;
        }
    }
    public void Write(byte[] data)
    {
        if (Count + data.Length > _buffer.Length)
            throw new Exception("buffer overflow");
        if (_endIndex + data.Length >= _buffer.Length)
        {
            var endLen = _buffer.Length - _endIndex;
            var remainingLen = data.Length - endLen;
            Array.Copy(data, 0, _buffer, _endIndex, endLen);
            Array.Copy(data, endLen, _buffer, 0, remainingLen);
            _endIndex = remainingLen;
        }
        else
        {
            Array.Copy(data, 0, _buffer, _endIndex, data.Length);
            _endIndex += data.Length;
        }
    }
    public byte[] Read(int len, bool keepData = false)
    {
        if (len > Count)
            throw new Exception("not enough data in buffer");
        var result = new byte[len];
        if (_startIndex + len < _buffer.Length)
        {
            Array.Copy(_buffer, _startIndex, result, 0, len);
            if (!keepData)
                _startIndex += len;
            return result;
        }
        else
        {
            var endLen = _buffer.Length - _startIndex;
            var remainingLen = len - endLen;
            Array.Copy(_buffer, _startIndex, result, 0, endLen);
            Array.Copy(_buffer, 0, result, endLen, remainingLen);
            if (!keepData)
                _startIndex = remainingLen;
            return result;
        }
    }
    public byte this[int index]
    {
        get
        {
            if (index >= Count)
                throw new ArgumentOutOfRangeException();
            return _buffer[(_startIndex + index) % _buffer.Length];
        }
    }
    public IEnumerable<byte> Bytes
    {
        get
        {
            for (var i = 0; i < Count; i++)
                yield return _buffer[(_startIndex + i) % _buffer.Length];
        }
    }
}

请注意:代码在读取时"consumps"-如果你不想这样做,只需删除"_startIndex=…"部分(或者使重载成为可选参数和check或其他什么(。

我认为BufferedStream是这个问题的解决方案。也可以通过调用Seek来读取len字节的未读数据。

BufferdStream buffer = new BufferedStream(tcpStream, size); // we have a buffer of size
...
...
while(...)
{
    buffer.Read(...);
    // do my staff
    // I read too much, I want to put back len bytes
    buffer.Seek(-len, SeekOrigin.End);
    // I shall come back and read later
}

不断增长的记忆

与最初指定sizeBufferedStream相反,MemoryStream可以增长。

记住流数据

MemoryStream始终保存所有读取的数据,而BufferedStream仅保存流数据的一段。

源流与字节数组

MemoryStream允许在Write()方法中添加输入字节,该方法将来可以是Read()。而BufferedSteam从构造函数中指定的另一个源流中获取输入字节。

这是我不久前写的缓冲区的另一个实现:

  • 可调整大小:允许数据排队,不抛出缓冲区溢出异常
  • 高效:使用单个缓冲区和buffer.Copy操作使数据入队/出队

来晚了,但为了子孙后代:

当我过去这样做的时候,我会采取稍微不同的方法如果您的消息具有固定的报头大小(这告诉您正文中有多少字节(,并且考虑到网络流已经缓冲,我将分两个阶段执行操作:

  • 在流上读取标头的字节
  • 基于标头对流进行的后续读取,读取正文的字节
  • 重复

这利用了这样一个事实,即对于流,当你要求"n"个字节时,你永远不会再得到,所以你可以忽略许多"opp I read too many,让我把这些放在一边,直到下次"的问题。

公平地说,这还不是故事的全部。我在流上有一个底层的包装器类来处理碎片问题(即,如果要求4个字节,在收到4个字节之前不要返回,或者流关闭(。但这一点相当容易。

在我看来,关键是将消息处理与流机制解耦,如果你不再试图将消息作为流中的单个ReadBytes((来使用,那么生活就会变得简单得多。

[无论您的读取是阻塞还是异步(APM/await(,所有这些都是正确的]

听起来像是要从套接字读取到MemoryStream缓冲区,然后从缓冲区中"弹出"数据,并在每次遇到特定字节时重置它。它看起来像这样:

void ReceiveAllMessages(Action<byte[]> messageReceived, Socket socket)
{
    var currentMessage = new MemoryStream();
    var buffer = new byte[128];
    while (true)
    {
        var read = socket.Receive(buffer, 0, buffer.Length);
        if (read == 0)
            break;     // Connection closed
        for (var i = 0; i < read; i++)
        {
            var currentByte = buffer[i];
            if (currentByte == END_OF_MESSAGE)
            {
                var message = currentMessage.ToByteArray();
                messageReceived(message);
                currentMessage = new MemoryStream();
            }
            else
            {
                currentMessage.Write(currentByte);
            }
        }
    }
}

您可以用Stream包装ConcurrentQueue<ArraySegment<byte>>来完成此操作(请记住,这只会使其向前(。然而,我真的不喜欢在使用数据之前将其保存在内存中的想法;它会让你面临一系列关于消息大小的攻击(无论是否有意(。你可能还想在谷歌上搜索"循环缓冲区"。

实际上,您应该编写的代码在收到数据后立即对数据进行有意义的处理:"Push Parsing"(例如,SAX支持这样做(。作为一个如何使用文本的例子:

private Encoding _encoding;
private Decoder _decoder;
private char[] _charData = new char[4];
public PushTextReader(Encoding encoding)
{
    _encoding = encoding;
    _decoder = _encoding.GetDecoder();
}
// A single connection requires its own decoder
// and charData. That connection should never
// call this method from multiple threads
// simultaneously.
// If you are using the ReadAsyncLoop you
// don't need to worry about it.
public void ReceiveData(ArraySegment<byte> data)
{
    // The two false parameters cause the decoder
    // to accept 'partial' characters.
    var charCount = _decoder.GetCharCount(data.Array, data.Offset, data.Count, false);
    charCount = _decoder.GetChars(data.Array, data.Offset, data.Count, _charData, 0, false);
    OnCharacterData(new ArraySegment<char>(_charData, 0, charCount));
}

如果必须能够在反序列化之前接受完整的消息,则可以使用MemoryMappedFile,其优点是发送实体不会耗尽服务器的内存。棘手的是将文件重置回零;因为这可能是一堆问题。解决这一问题的一种方法是:

TCP接收端

  1. 写入当前流
  2. 如果流超过某个长度,则移动到新的流

反序列化结束

  1. 从当前流中读取
  2. 一旦你清空了小溪,就把它毁掉

TCP接收端非常简单。解串器端将需要一些基本的缓冲拼接逻辑(记住使用Buffer.BlockCopy而不是Array.Copy(。

旁注:听起来是一个有趣的项目,如果我有时间并记住,我可能会继续实施这个系统。

这里只有三个答案可以提供代码。其中一个很笨拙,其他人不回答这个问题。

这里有一个你可以复制粘贴的类:

/// <summary>
/// This class is a very fast and threadsafe FIFO buffer
/// </summary>
public class FastFifo
{
    private List<Byte> mi_FifoData = new List<Byte>();
    /// <summary>
    /// Get the count of bytes in the Fifo buffer
    /// </summary>
    public int Count
    {
        get 
        { 
            lock (mi_FifoData)
            {
                return mi_FifoData.Count; 
            }
        }
    }
    /// <summary>
    /// Clears the Fifo buffer
    /// </summary>
    public void Clear()
    {
        lock (mi_FifoData)
        {
            mi_FifoData.Clear();
        }
    }
    /// <summary>
    /// Append data to the end of the fifo
    /// </summary>
    public void Push(Byte[] u8_Data)
    {
        lock (mi_FifoData)
        {
            // Internally the .NET framework uses Array.Copy() which is extremely fast
            mi_FifoData.AddRange(u8_Data);
        }
    }
    /// <summary>
    /// Get data from the beginning of the fifo.
    /// returns null if s32_Count bytes are not yet available.
    /// </summary>
    public Byte[] Pop(int s32_Count)
    {
        lock (mi_FifoData)
        {
            if (mi_FifoData.Count < s32_Count)
                return null;
            // Internally the .NET framework uses Array.Copy() which is extremely fast
            Byte[] u8_PopData = new Byte[s32_Count];
            mi_FifoData.CopyTo(0, u8_PopData, 0, s32_Count);
            mi_FifoData.RemoveRange(0, s32_Count);
            return u8_PopData;
        }
    }
    /// <summary>
    /// Gets a byte without removing it from the Fifo buffer
    /// returns -1 if the index is invalid
    /// </summary>
    public int PeekAt(int s32_Index)
    {
        lock (mi_FifoData)
        {
            if (s32_Index < 0 || s32_Index >= mi_FifoData.Count)
                return -1;
            return mi_FifoData[s32_Index];
        }
    }
}