从套接字读取连续消息

本文关键字:消息 连续 读取 套接字 | 更新日期: 2023-09-27 18:18:26

我的目标是从套接字读取消息,其中每个消息用ETX字符分隔。这是一个高频率的市场数据馈送,所以我不认为逐字节的方法有意义,而且完整消息的大小是未知的。

是否有一种方法可以通过使用NetworkStream类来读取此消息?我也尝试过使用Socket类用于此目的,但不是从套接字逐一读取消息,而是从套接字读取所有消息,这成为一个问题,因为系统变慢了。

从套接字读取连续消息

开始;下面是用于从SocketStream等源读取哨兵分隔消息列表的基本过程。棘手的一点是跟踪您在传入缓冲区中使用的内容,以及来自早期缓冲区的未使用数据的积压。请注意,在SocketStream之间更改此代码实际上是将Receive更改为Read -除了方法相同之外。

下面的内容基本上可以满足你的需要。您可以使用ReadNext() API,直到您获得null(它表示流的结束),或者您可以使用ReadAll(),它为您提供IEnumerable<string>序列。您可以通过构造函数调整编码和缓冲区大小,但默认值是相同的。

foreach (var s in reader.ReadAll())
    Console.WriteLine(s);
代码:

class EtxReader : IDisposable
{
    public IEnumerable<string> ReadAll()
    {
        string s;
        while ((s = ReadNext()) != null) yield return s;
    }
    public void Dispose()
    {
        if (socket != null) socket.Dispose();
        socket = null;
        if (backlog != null) backlog.Dispose();
        backlog = null;
        buffer = null;
        encoding = null;
    }
    public EtxReader(Socket socket, Encoding encoding = null, int bufferSize = 4096)
    {
        this.socket = socket;
        this.encoding = encoding ?? Encoding.UTF8;
        this.buffer = new byte[bufferSize];
    }
    private Encoding encoding;
    private Socket socket;
    int index, count;
    byte[] buffer;
    private bool ReadMore()
    {
        index = count = 0;
        int bytes = socket.Receive(buffer);
        if (bytes > 0)
        {
            count = bytes;
            return true;
        }
        return false;
    }
    public const byte ETX = 3;
    private MemoryStream backlog = new MemoryStream();
    public string ReadNext()
    {
        string s;
        if (count == 0)
        {
            if (!ReadMore()) return null;
        }
        // at this point, we expect there to be *some* data;
        // this may or may not include the ETX terminator
        var etxIndex = Array.IndexOf(buffer, ETX, index);
        if (etxIndex >= 0)
        {
            // found another message in the existing buffer
            int len = etxIndex - index;
            s = encoding.GetString(buffer, index, len);
            index = etxIndex + 1;
            count -= (len + 1);
            return s;
        }
        // no ETX in the buffer, so we'll need to fetch more data;
        // buffer the unconsumed data that we have
        backlog.SetLength(0);
        backlog.Write(buffer, index, count);
        bool haveEtx;
        do
        {
            if (!ReadMore())
            {
                // we had unused data; this must signal an error
                throw new EndOfStreamException();
            }
            etxIndex = Array.IndexOf(buffer, ETX, index);
            haveEtx = etxIndex >= 0;
            if (!haveEtx)
            {
                // keep buffering
                backlog.Write(buffer, index, count);
            }
        } while (!haveEtx);
        // now we have some data in the backlog, and the ETX in the buffer;
        // for convenience, copy the rest of the next message into
        // the backlog
        backlog.Write(buffer, 0, etxIndex);
        s = encoding.GetString(backlog.GetBuffer(), 0, (int)backlog.Length);
        index = etxIndex + 1;
        count -= (etxIndex + 1);
        return s;
    }
}

那么这大概是一个基于文本的API。这里使用NetworkStreamSocket之间没有实际的区别;StreamSocket都不会"读取所有消息"——只有你的代码才会这样做。

在这两种情况下,您都需要一个几乎相同的循环来获取下一个数据块(这与"消息"不是同义词),并开始寻找哨兵值(您是指ETX吗?)-根据需要处理或缓冲。除非您知道传入的提要是单字节编码,否则最好将其视为字节,直到您实际将其拆分为逻辑消息,并且只有在在其上运行文本解码器以获取此消息的文本,然后再移动到下一个。

您应该研究异步通信和TcpListener类。我的方法是:

    创建监听器
  1. 让它连续监听连接(BeginAccept/EndAccecpt)。
  2. 对于每个连接,从NetworkStream异步读取,直到客户端断开(BeginRead/EndRead)。你可以读取数据块,例如你可以尝试一次读取512字节-如果缓冲区中有少于512字节,你将得到少于512字节。
  3. 有任何进来附加到StringBuilder(一个为每个连接,注意正确的编码时转换byte[]string)
  4. 如果StringBuilder包含分隔符,将该消息分离并将其写入队列(在加入队列之前不要忘记锁定队列!)
  5. 有一个单独的线程持续监视新消息队列并处理它们。你也可以给线程发信号,如果你把一些新的东西放入队列,例如使用ManualResetEvent

这只是一个粗略的提纲,但我相信你已经明白了。

没有所谓的读取"消息"——所有通过TCP/IP传入的东西都只是一个字节流——这就是为什么你得到一个网络。消息是您用来解释传入数据的概念。