正在缓冲C#中的字节数据
本文关键字:字节 字节数 数据 缓冲 | 更新日期: 2023-09-27 18:06:02
我的应用程序从TCP套接字读取字节,并需要对它们进行缓冲,以便以后可以从中提取消息。由于TCP的性质,我可能在一次读取中获得部分或多条消息,因此在每次读取后,我希望检查缓冲区并提取尽可能多的完整消息。
因此,我想要一个允许我做以下事情的类:
- 向其附加任意字节[]数据
- 在不消耗内容的情况下检查内容,特别是检查内容的数量,并搜索某个或多个字节的存在
- 提取并使用部分数据作为字节[],而将其余数据保留在那里以备将来读取
我希望我想要的可以用.NET库中的一个或多个现有类来完成,但我不确定是哪一个System.IO.MemoryStream看起来很接近我想要的,但(a(不清楚它是否适合用作缓冲区(读取的数据是否会从容量中删除?(和(b(读取和写入似乎发生在同一个位置-"流的当前位置是下一次读取或写入操作可能发生的位置。">-这不是我想要的。我需要写到最后,从前面开始阅读。
我建议您在后台使用MemoryStream
,但将其封装在另一个存储的类中
MemoryStream
- 当前"读取"位置
- 当前"消耗"位置
然后它将暴露:
- 写入:将流的位置设置为末尾,写入数据,将流的位置回读取位置
- 读取:读取数据,将读取位置设置为流的位置
- 消费:更新已消费的头寸(根据您尝试消费的方式进行详细说明(;如果消耗位置高于某个阈值,则将现有缓冲的数据复制到新的
MemoryStream
中,并更新所有变量。(您可能不想在每次消费请求时复制上的缓冲区。(
请注意,如果没有额外的同步,这些都不会是线程安全的。
只需使用一个大字节数组和array.Copy-它就可以了。如果没有,请使用List<byte>
。
如果您使用数组,您必须自己实现对它的索引(复制额外数据的地方((检查内容大小也是如此(,但这很简单。
如果您感兴趣:这里有一个"循环缓冲区"的简单实现。测试应该运行(我对它进行了几个单元测试,但它没有检查所有关键路径(:
public class ReadWriteBuffer
{
private readonly byte[] _buffer;
private int _startIndex, _endIndex;
public ReadWriteBuffer(int capacity)
{
_buffer = new byte[capacity];
}
public int Count
{
get
{
if (_endIndex > _startIndex)
return _endIndex - _startIndex;
if (_endIndex < _startIndex)
return (_buffer.Length - _startIndex) + _endIndex;
return 0;
}
}
public void Write(byte[] data)
{
if (Count + data.Length > _buffer.Length)
throw new Exception("buffer overflow");
if (_endIndex + data.Length >= _buffer.Length)
{
var endLen = _buffer.Length - _endIndex;
var remainingLen = data.Length - endLen;
Array.Copy(data, 0, _buffer, _endIndex, endLen);
Array.Copy(data, endLen, _buffer, 0, remainingLen);
_endIndex = remainingLen;
}
else
{
Array.Copy(data, 0, _buffer, _endIndex, data.Length);
_endIndex += data.Length;
}
}
public byte[] Read(int len, bool keepData = false)
{
if (len > Count)
throw new Exception("not enough data in buffer");
var result = new byte[len];
if (_startIndex + len < _buffer.Length)
{
Array.Copy(_buffer, _startIndex, result, 0, len);
if (!keepData)
_startIndex += len;
return result;
}
else
{
var endLen = _buffer.Length - _startIndex;
var remainingLen = len - endLen;
Array.Copy(_buffer, _startIndex, result, 0, endLen);
Array.Copy(_buffer, 0, result, endLen, remainingLen);
if (!keepData)
_startIndex = remainingLen;
return result;
}
}
public byte this[int index]
{
get
{
if (index >= Count)
throw new ArgumentOutOfRangeException();
return _buffer[(_startIndex + index) % _buffer.Length];
}
}
public IEnumerable<byte> Bytes
{
get
{
for (var i = 0; i < Count; i++)
yield return _buffer[(_startIndex + i) % _buffer.Length];
}
}
}
请注意:代码在读取时"consumps"-如果你不想这样做,只需删除"_startIndex=…"部分(或者使重载成为可选参数和check或其他什么(。
我认为BufferedStream
是这个问题的解决方案。也可以通过调用Seek来读取len
字节的未读数据。
BufferdStream buffer = new BufferedStream(tcpStream, size); // we have a buffer of size
...
...
while(...)
{
buffer.Read(...);
// do my staff
// I read too much, I want to put back len bytes
buffer.Seek(-len, SeekOrigin.End);
// I shall come back and read later
}
不断增长的记忆
与最初指定size
的BufferedStream
相反,MemoryStream
可以增长。
记住流数据
MemoryStream
始终保存所有读取的数据,而BufferedStream
仅保存流数据的一段。
源流与字节数组
MemoryStream
允许在Write()
方法中添加输入字节,该方法将来可以是Read()
。而BufferedSteam
从构造函数中指定的另一个源流中获取输入字节。
这是我不久前写的缓冲区的另一个实现:
- 可调整大小:允许数据排队,不抛出缓冲区溢出异常
- 高效:使用单个缓冲区和buffer.Copy操作使数据入队/出队
来晚了,但为了子孙后代:
当我过去这样做的时候,我会采取稍微不同的方法如果您的消息具有固定的报头大小(这告诉您正文中有多少字节(,并且考虑到网络流已经缓冲,我将分两个阶段执行操作:
- 在流上读取标头的字节
- 基于标头对流进行的后续读取,读取正文的字节
- 重复
这利用了这样一个事实,即对于流,当你要求"n"个字节时,你永远不会再得到,所以你可以忽略许多"opp I read too many,让我把这些放在一边,直到下次"的问题。
公平地说,这还不是故事的全部。我在流上有一个底层的包装器类来处理碎片问题(即,如果要求4个字节,在收到4个字节之前不要返回,或者流关闭(。但这一点相当容易。
在我看来,关键是将消息处理与流机制解耦,如果你不再试图将消息作为流中的单个ReadBytes((来使用,那么生活就会变得简单得多。
[无论您的读取是阻塞还是异步(APM/await(,所有这些都是正确的]
听起来像是要从套接字读取到MemoryStream缓冲区,然后从缓冲区中"弹出"数据,并在每次遇到特定字节时重置它。它看起来像这样:
void ReceiveAllMessages(Action<byte[]> messageReceived, Socket socket)
{
var currentMessage = new MemoryStream();
var buffer = new byte[128];
while (true)
{
var read = socket.Receive(buffer, 0, buffer.Length);
if (read == 0)
break; // Connection closed
for (var i = 0; i < read; i++)
{
var currentByte = buffer[i];
if (currentByte == END_OF_MESSAGE)
{
var message = currentMessage.ToByteArray();
messageReceived(message);
currentMessage = new MemoryStream();
}
else
{
currentMessage.Write(currentByte);
}
}
}
}
您可以用Stream
包装ConcurrentQueue<ArraySegment<byte>>
来完成此操作(请记住,这只会使其向前(。然而,我真的不喜欢在使用数据之前将其保存在内存中的想法;它会让你面临一系列关于消息大小的攻击(无论是否有意(。你可能还想在谷歌上搜索"循环缓冲区"。
实际上,您应该编写的代码在收到数据后立即对数据进行有意义的处理:"Push Parsing"(例如,SAX支持这样做(。作为一个如何使用文本的例子:
private Encoding _encoding;
private Decoder _decoder;
private char[] _charData = new char[4];
public PushTextReader(Encoding encoding)
{
_encoding = encoding;
_decoder = _encoding.GetDecoder();
}
// A single connection requires its own decoder
// and charData. That connection should never
// call this method from multiple threads
// simultaneously.
// If you are using the ReadAsyncLoop you
// don't need to worry about it.
public void ReceiveData(ArraySegment<byte> data)
{
// The two false parameters cause the decoder
// to accept 'partial' characters.
var charCount = _decoder.GetCharCount(data.Array, data.Offset, data.Count, false);
charCount = _decoder.GetChars(data.Array, data.Offset, data.Count, _charData, 0, false);
OnCharacterData(new ArraySegment<char>(_charData, 0, charCount));
}
如果必须能够在反序列化之前接受完整的消息,则可以使用MemoryMappedFile,其优点是发送实体不会耗尽服务器的内存。棘手的是将文件重置回零;因为这可能是一堆问题。解决这一问题的一种方法是:
TCP接收端
- 写入当前流
- 如果流超过某个长度,则移动到新的流
反序列化结束
- 从当前流中读取
- 一旦你清空了小溪,就把它毁掉
TCP接收端非常简单。解串器端将需要一些基本的缓冲拼接逻辑(记住使用Buffer.BlockCopy
而不是Array.Copy
(。
旁注:听起来是一个有趣的项目,如果我有时间并记住,我可能会继续实施这个系统。
这里只有三个答案可以提供代码。其中一个很笨拙,其他人不回答这个问题。
这里有一个你可以复制粘贴的类:
/// <summary>
/// This class is a very fast and threadsafe FIFO buffer
/// </summary>
public class FastFifo
{
private List<Byte> mi_FifoData = new List<Byte>();
/// <summary>
/// Get the count of bytes in the Fifo buffer
/// </summary>
public int Count
{
get
{
lock (mi_FifoData)
{
return mi_FifoData.Count;
}
}
}
/// <summary>
/// Clears the Fifo buffer
/// </summary>
public void Clear()
{
lock (mi_FifoData)
{
mi_FifoData.Clear();
}
}
/// <summary>
/// Append data to the end of the fifo
/// </summary>
public void Push(Byte[] u8_Data)
{
lock (mi_FifoData)
{
// Internally the .NET framework uses Array.Copy() which is extremely fast
mi_FifoData.AddRange(u8_Data);
}
}
/// <summary>
/// Get data from the beginning of the fifo.
/// returns null if s32_Count bytes are not yet available.
/// </summary>
public Byte[] Pop(int s32_Count)
{
lock (mi_FifoData)
{
if (mi_FifoData.Count < s32_Count)
return null;
// Internally the .NET framework uses Array.Copy() which is extremely fast
Byte[] u8_PopData = new Byte[s32_Count];
mi_FifoData.CopyTo(0, u8_PopData, 0, s32_Count);
mi_FifoData.RemoveRange(0, s32_Count);
return u8_PopData;
}
}
/// <summary>
/// Gets a byte without removing it from the Fifo buffer
/// returns -1 if the index is invalid
/// </summary>
public int PeekAt(int s32_Index)
{
lock (mi_FifoData)
{
if (s32_Index < 0 || s32_Index >= mi_FifoData.Count)
return -1;
return mi_FifoData[s32_Index];
}
}
}