如何“刷新”TCP 客户端缓冲区

本文关键字:客户端 缓冲区 TCP 刷新 如何 | 更新日期: 2023-09-27 18:36:07

我从几个例子中提取了一个连接到服务器的全双工C# TCP客户端。 基本概念是客户端和服务器都发送/接收消息(命令和事件)。 因此,我开发了一个 FullDuplexSocket 类,该类公开了一个用于向服务器发送消息的 Send 方法和一个用于从服务器接收消息的事件处理程序。 一切正常,除了我似乎无法刷新从服务器接收的消息缓冲区。 每次服务器发送新消息时,套接字中的缓冲区都包含所有旧消息(已读取)新消息。 我可以通过已知的分隔符 (/r/n) 拆分消息并跟踪它们,但这可能是长时间运行的通信中内存问题的根源。[编辑:将代码更新为不再存在缓冲区问题且工作正常的解决方案]。

有人有任何建议吗? 完全重写??? 代码在下面,希望它能帮助其他人。

下面是 FullDuplexSocket 类:

using System;
using System.Text;
using System.Net.Sockets;
namespace Utilities
{
    public class StateObject{
        public Socket workSocket = null;
        public const int BUFFER_SIZE = 1024;
        public byte[] buffer = new byte[BUFFER_SIZE];
    }
    public class FullDuplexSocket : IDisposable
    {
        public event NewMessageHandler OnMessageReceived;
        public delegate void NewMessageHandler(string Message);
        public event DisconnectHandler OnDisconnect;
        public delegate void DisconnectHandler(string Reason);
        private Socket _socket;
        private bool _useASCII = true;
        private string _remoteServerIp = "";
        private int _port = 0;
        /// <summary>
        /// Constructer of a full duplex client socket.   The consumer should immedately define 
        /// and event handler for the OnMessageReceived event after construction has completed.
        /// </summary>
        /// <param name="RemoteServerIp">The remote Ip address of the server.</param>
        /// <param name="Port">The port that will used to transfer/receive messages to/from the remote IP.</param>
        /// <param name="UseASCII">The character type to encode/decode messages.  Defaulted to use ASCII, but setting the value to false will encode/decode messages in UNICODE.</param>
        public FullDuplexSocket(string RemoteServerIp, int Port, bool UseASCII = true)
        {
            _useASCII = UseASCII;
            _remoteServerIp = RemoteServerIp;
            _port = Port;
            try //to create the socket and connect
            {
                _socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
                _socket.Connect(RemoteServerIp, _port);
            }
            catch (Exception e)
            {
                throw new Exception("Unable to connect to the remote Ip.", e);
            }
            try //to listen to the socket
            {
                StateObject stateObject = new StateObject();
                stateObject.workSocket = _socket;
                _socket.BeginReceive
                    (
                        stateObject.buffer, //Buffer to load in our state object
                        0, //Start at the first position in the byte array
                        StateObject.BUFFER_SIZE, //only load up to the max per read
                        0, //Set socket flags here if necessary 
                        new AsyncCallback(ReadFromSocket), //Who to call when data arrives
                        stateObject //state object to use when data arrives
                    );
            }
            catch (Exception e)
            {
                throw new Exception("Unable to start listening to the socket.", e);
            }
        }

        /// <summary>
        /// This will read the bytes from the socket, convert the bytes to a string and fire the OnMessageReceived event.
        /// If the socket is forcibly closed, the OnDisconnect event will be fired.   This happens when the other side of
        /// the socket connection on the remote Ip is no longer available.
        /// </summary>
        /// <param name="asyncResult"></param>
        public void ReadFromSocket(IAsyncResult asyncResult)
        {
            StateObject stateObject = (StateObject)asyncResult.AsyncState; //pull out the state object
            int bytesReceived = 0;
            try //to receive the message.
            {
                bytesReceived = stateObject.workSocket.EndReceive(asyncResult); 
            }
            catch (Exception e)  //Exception will occur if connection was forcibly closed.
            {
                RaiseOnDisconnect(e.Message);
                return;
            }
            if (bytesReceived > 0)
            {
                RaiseOnMessageReceived
                    (
                        _useASCII ?
                            Encoding.ASCII.GetString(stateObject.buffer, 0, bytesReceived) :
                            Encoding.Unicode.GetString(stateObject.buffer, 0, bytesReceived)
                    );
                stateObject.workSocket.BeginReceive
                    (
                        stateObject.buffer, //Buffer to load in our state object
                        0, //Start at the first position in the byte array
                        StateObject.BUFFER_SIZE, //only load up to the max per read
                        0, //Set socket flags here if necessary 
                        new AsyncCallback(ReadFromSocket), //Who to call when data arrives
                        stateObject //state object to use when data arrives
                    );
            }
            else
            {
                stateObject.workSocket.Close();
                RaiseOnDisconnect("Socket closed normally.");
                return;
            }
        }
        /// <summary>
        /// Broadcast a message to the IP/Port.  Consumer should handle any exceptions thrown by the socket.
        /// </summary>
        /// <param name="Message">The message to be sent will be encoded using the character set defined during construction.</param>
        public void Send(string Message)
        {
            //all messages are terminated with /r/n
            Message += Environment.NewLine;
            byte[] bytesToSend = _useASCII ?
                Encoding.ASCII.GetBytes(Message) :
                Encoding.Unicode.GetBytes(Message);
            int bytesSent = _socket.Send(bytesToSend);
        }
        /// <summary>
        /// Clean up the socket.
        /// </summary>
        void IDisposable.Dispose()
        {
            try
            {
                _socket.Close();
                RaiseOnDisconnect("Socket closed via Dispose method.");
            }
            catch { }
            try
            {
                _socket.Dispose();
            }
            catch { }
        }

        /// <summary>
        /// This method will gracefully raise any delegated events if they exist.
        /// </summary>
        /// <param name="Message"></param>
        private void RaiseOnMessageReceived(string Message)
        {
            try //to raise delegates
            {
                OnMessageReceived(Message);
            }
            catch { } //when none exist ignore the Object Reference Error
        }
        /// <summary>
        /// This method will gracefully raise any delegated events if they exist.
        /// </summary>
        /// <param name="Message"></param>
        private void RaiseOnDisconnect(string Message)
        {
            try //to raise delegates
            {
                OnDisconnect(Message);
            }
            catch { } //when none exist ignore the Object Reference Error
        }
    }
}

该类的使用者只需执行以下操作:

using System;
namespace Utilities
{
    public class SocketConsumer
    {
        private FullDuplexSocket _fds;
        public Consumer()
        {
            _fds = new FullDuplexSocket("192.168.1.103", 4555);
            _fds.OnMessageReceived += fds_OnMessageReceived;
            _fds.Send("Hello World!");
        }
        void fds_OnMessageReceived(string Message)
        {
            Console.WriteLine("Message: {0}", Message);
        }
    }
}

任何帮助都会很棒。 谢谢!

如何“刷新”TCP 客户端缓冲区

即使缓冲区尚未填充,您也会调用OnMessageReceived(以防bytesRead < count)。请考虑切换到应用程序的异步部分的 await。这摆脱了可怕的回调递归。