有关异步套接字操作和消息框架的 .NET 问题

本文关键字:框架 NET 问题 消息 异步 套接字 操作 | 更新日期: 2023-09-27 17:57:10

我一直在寻找如何处理TCP消息框架的例子。我看到许多示例,其中NetworkStreams被传递到StreamReader或StreamWriter对象中,然后使用ReadLine或WriteLine方法处理"'"分隔的消息。我的应用程序协议包含以"'"结尾的消息,因此网络流似乎是要走的路。但是,我找不到任何关于与异步套接字结合使用处理所有这些的正确方法的具体示例。当下面调用 ReceiveCallback() 时,我如何实现 NetworkStream 和 StreamReader 类来处理消息成帧?根据我所读到的内容,我可能会在一次接收中收到一条消息的一部分,而在下一次接收中收到其余消息(包括"'")。这是否意味着我可以得到一条消息的结尾和下一条消息的一部分?当然,必须有一种更简单的方法来处理这个问题。

我有以下代码:

    private void StartRead(Socket socket)
    {
        try
        {
            StateObject state = new StateObject();
            state.AsyncSocket = socket;
            socket.BeginReceive(state.Buffer, 0, StateObject.BufferSize, 0, new AsyncCallback(ReceiveCallback), state);
        }
        catch (SocketException)
        {
            m_Socket.Shutdown(SocketShutdown.Both);
            Disconnect();
        }
    }
    private void ReceiveCallback(IAsyncResult ar)
    {
        try
        {
            StateObject state = (StateObject)ar.AsyncState;
            int bytes_read = state.AsyncSocket.EndReceive(ar);
            char[] chars = new char[bytes_read + 1];
            System.Text.Decoder decoder = System.Text.Encoding.UTF8.GetDecoder();
            int charLength = decoder.GetChars(state.Buffer, 0, bytes_read, chars, 0);
            String data = new String(chars);
            ParseMessage(data);
            StartRead(state.AsyncSocket);
        }
        catch (SocketException)
        {
            m_Socket.Shutdown(SocketShutdown.Both);
            Disconnect();
        }
    }

有关异步套接字操作和消息框架的 .NET 问题

块前面加上长度比使用分隔符更好。您不必处理任何类型的转义即可以换行符发送数据。

此答案现在可能与你无关,因为它使用 AsyncCTP 中的功能,这些功能仅在下一版本的 .net 中提供。但是,它确实使事情更加简洁。本质上,您编写的代码与同步情况的代码完全相同,但在存在异步调用的地方插入"await"语句。

    public static async Task<Byte[]> ReadChunkAsync(this Stream me) {
        var size = BitConverter.ToUInt32(await me.ReadExactAsync(4), 0);
        checked {
            return await me.ReadExactAsync((int)size);
        }
    }
    public static async Task<Byte[]> ReadExactAsync(this Stream me, int count) {
        var buf = new byte[count];
        var t = 0;
        while (t < count) {
            var n = await me.ReadAsync(buf, t, count - t);
            if (n <= 0) {
                if (t > 0) throw new IOException("End of stream (fragmented)");
                throw new IOException("End of stream");
            }
            t += n;
        }
        return buf;
    }
    public static void WriteChunk(this Stream me, byte[] buffer, int offset, int count) {
        me.Write(BitConverter.GetBytes(count), 0, 4);
        me.Write(buffer, offset, count);
    }

基本上,您创建一个缓冲区,每次接收数据时,您都会将该数据添加到缓冲区中,并确定您是否已经收到一条或多条完整的消息。

ReceiveCallbackStartRead 之间,您不会收到任何异步消息(传入的数据将自动在套接字级别缓冲),因此它是检查完整消息并将其从缓冲区中删除的理想位置。

所有变体都是可能的,包括接收消息 1 的结尾、消息 2 以及消息 3 的开头,所有这些都在一个块中。

我不建议对块进行 UTF8 解码,因为一个 UTF8 字符可能包含两个字节,如果它们在块之间拆分,您的数据可能会损坏。在这种情况下,您可以保留一个字节[]缓冲区(MemoryStream?)并在0x0A字节上拆分消息。

好的,这就是我最终所做的。我创建了一个读取器线程,该线程基于网络流创建NetworkStream和StreamReader。然后我使用StreamReader.ReadLine以这种方式读取行。这是一个同步调用,但它位于自己的线程中。它似乎工作得更好。我必须实现这一点,因为这是我们的应用程序协议(换行符分隔的消息)。我知道其他人会像我一样四处寻找答案,就像我在我的客户端类中所做的那样:

public class Client
{
    Socket              m_Socket;
    EventWaitHandle     m_WaitHandle;
    readonly object     m_Locker;
    Queue<IEvent>       m_Tasks;
    Thread              m_Thread;
    Thread              m_ReadThread;
    public Client()
    {
        m_WaitHandle = new AutoResetEvent(false);
        m_Locker = new object();
        m_Tasks = new Queue<IEvent>();
        m_Thread = new Thread(Run);
        m_Thread.IsBackground = true;
        m_Thread.Start();
    }
    public void EnqueueTask(IEvent task)
    {
        lock (m_Locker)
        {
            m_Tasks.Enqueue(task);
        }
        m_WaitHandle.Set();
    }
    private void Run()
    {
        while (true)
        {
            IEvent task = null;
            lock (m_Locker)
            {
                if (m_Tasks.Count > 0)
                {
                    task = m_Tasks.Dequeue();
                    if (task == null)
                    {
                        return;
                    }
                }
            }
            if (task != null)
            {
                task.DoTask(this);
            }
            else
            {
                m_WaitHandle.WaitOne();
            }
        }
    }
    public void Connect(string hostname, int port)
    {
        try
        {
            m_Socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
            IPAddress[] IPs = Dns.GetHostAddresses(hostname);
            m_Socket.BeginConnect(IPs, port, new AsyncCallback(ConnectCallback), m_Socket);
        }
        catch (SocketException)
        {
            m_Socket.Close();
            OnConnect(false, "Unable to connect to server.");
        }
    }
    private void ConnectCallback(IAsyncResult ar)
    {
        try
        {
            Socket socket = (Socket)ar.AsyncState;
            socket.EndConnect(ar);
            OnConnect(true, "Successfully connected to server.");
            m_ReadThread = new Thread(new ThreadStart(this.ReadThread));
            m_ReadThread.Name = "Read Thread";
            m_ReadThread.IsBackground = true;
            m_ReadThread.Start();
        }
        catch (SocketException)
        {
            m_Socket.Close();
            OnConnect(false, "Unable to connect to server.");
        }
    }
    void ReadThread()
    {
        NetworkStream networkStream = new NetworkStream(m_Socket);
        StreamReader reader = new StreamReader(networkStream);
        while (true)
        {
            try
            {
                String message = reader.ReadLine();
                // To keep the code thread-safe, enqueue a task in the CLient class thread to parse the message received.
                EnqueueTask(new ServerMessageEvent(message));
            }
            catch (IOException)
            {
                // The code will reach here if the server disconnects from the client. Make sure to cleanly shutdown...
                Disconnect();
                break;
            }
        }
    }
    ... Code for sending/parsing the message in the Client class thread.
}