指导与TCP客户端,重新连接功能等

本文关键字:重新连接 功能 客户端 TCP | 更新日期: 2023-09-27 18:06:15

我有一个TCP客户端,主要运行在mono,我希望一些指导,我认为我做错了一些事情,不需要的东西,等。

下面的代码是我用来作为我的疑问的样本的一部分。

  • 正如你所看到的,一旦构造函数被调用,当我实例化ConcurrentQueues时,我应该让它自己实例化而不需要从构造函数中初始化它,或者我目前所做的方式是正确的,或者它真的无关紧要吗?

  • 我目前有3个线程运行,我相信我可以减少到2个甚至一个,但我有点不安全。

    如你所见,我有:

    对于_ReceivePackets

    receiveThread这个控件控制从roomserver接收到的所有数据

    sendThread_SendPackets这个选项控制所有必须发送到roomserver的内容

    responseThread for _Response这将处理从roomserver

    排队的所有响应。

    我相信我可以把_SendPackets_ReceivePackets合并为一个,增加到我的类SendPackets,无论是要发送的还是要交付的,我害怕的是如果它有一个巨大的in/out,如果它仍然会跟上而不把事情搞砸。

    我有_Response分离,因为它将处理更多的响应数据,每一种类型的回复,我认为是好的,不认为它会工作,如果我删除它,让_Response处理它自己,因为一些数据包不会在一个镜头读取。

  • 我应该在多大程度上依赖自己进入_socket.Connected ?

  • 我在部署重新连接时遇到了一些问题,大多数时候当我有一些连接问题时,它不会触发任何错误,它只是坐在那里,端口打开,好像它仍然连接,我该如何检测我是否还活着?

旁注:这是一个非常基本的聊天tcp客户端的实现,我目前正在研究。

using System;
using System.IO;
using System.Net;
using System.Text;
using System.Threading;
using System.Net.Sockets;
using System.Collections.Concurrent;
using log4net;
namespace Connection
{
    public class Roomserver
    {
        private static readonly ILog logger = LogManager.GetLogger(typeof(Roomserver));
        private ConcurrentQueue<byte[]> RoomserverReceivedPackets = null;
        private ConcurrentQueue<SendPackets> RoomserverSendPackets = null;
        private AutoResetEvent _queueNotifier = new AutoResetEvent(false);
        private AutoResetEvent _sendQueueNotifier = new AutoResetEvent(false);
        public static byte[] myinfo = null;
        private IPAddress _server = null;
        private int _port = 0;
        private int _roomID = 0;
        private Socket _socket;
        private Status _status = Status.Disconnected;
        private Thread responseThread = null;
        private Thread receiveThread = null;
        private Thread sendThread = null;
        private EndPoint _roomServer = null;
        public bool Connected
        {
            get { return _socket.Connected; }
        }
        public Status GetStatus
        {
            get { return _status; }
        }
        public Roomserver(IPAddress server, int port)
        {
            this._server = server;
            this._port = port;
            RoomserverReceivedPackets = new ConcurrentQueue<byte[]>();
            RoomserverSendPackets = new ConcurrentQueue<SendPackets>();
        }
        public Roomserver(IPAddress server, int port, int roomID)
        {
            this._server = server;
            this._port = port;
            this._roomID = roomID;
            RoomserverReceivedPackets = new ConcurrentQueue<byte[]>();
            RoomserverSendPackets = new ConcurrentQueue<SendPackets>();
        }
        public bool Connect()
        {
            try
            {
                if (_status != Status.Disconnected)
                    this.Disconnect();
                _socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
                IPEndPoint remoteEndPoint = new IPEndPoint(_server, _port);
                _socket.Connect(remoteEndPoint);
                _status = Status.Connect;
                _roomServer = (EndPoint)remoteEndPoint;
                receiveThread = new Thread(_ReceivePackets);
                receiveThread.Start();
                sendThread = new Thread(_SendPackets);
                sendThread.Start();
                responseThread = new Thread(_Response);
                responseThread.Start();
                return _socket.Connected;
            }
            catch (SocketException se)
            {
                logger.Error("Connect: " + se.ToString());
                _status = Status.Disconnected;
                return false;
            }
            catch (Exception ex)
            {
                logger.Error("Connect: " + ex.ToString());
                _status = Status.Disconnected;
                return false;
            }
        }
        public bool Disconnect()
        {
            if (_socket.Connected)
            {
                _status = Status.Disconnected;
                if (receiveThread != null && receiveThread.IsAlive)
                {
                    receiveThread.Abort();
                }
                if (responseThread != null && responseThread.IsAlive)
                {
                    responseThread.Abort();
                }
                if (sendThread != null && sendThread.IsAlive)
                {
                    sendThread.Abort();
                }
                try
                {
                    _socket.Close();
                    return true;
                }
                catch (Exception ex)
                {
                    logger.Info("Disconnect " + ex.ToString());
                    _status = Status.Disconnected;
                    return true;
                }
            }
            else
            {
                logger.Info("Not connected ...");
                _status = Status.Disconnected;
                return true;
            }
        }
        public bool SendData(byte[] bytes, bool delay)
        {
            try
            {
                SendPackets data = new SendPackets()
                {
                    Data = bytes,
                    Delay = delay
                };
                RoomserverSendPackets.Enqueue(data);
                _sendQueueNotifier.Set();
                return true;
            }
            catch (Exception ex)
            {
                logger.Error("SendData " + ex.ToString());
                return false;
            }
        }
        private void _SendPackets()
        {
            while (_socket.Connected)
            {
                _sendQueueNotifier.WaitOne();
                while (!RoomserverSendPackets.IsEmpty)
                {
                    SendPackets packet = null;
                    if (RoomserverSendPackets.TryDequeue(out packet))
                    {
                        try
                        {
                            if (packet.Delay)
                            {
                                Thread.Sleep(1000);
                                _socket.Send(packet.Data);
                            }
                            else
                                _socket.Send(packet.Data);
                        }
                        catch (SocketException soe)
                        {
                            logger.Error(soe.ToString());
                        }
                    }
                }
            }
        }
        private void _ReceivePackets()
        {
            bool extraData = false;
            MemoryStream fullPacket = null;
            int fullPacketSize = 0;
            while (_socket.Connected)
            {
                try
                {
                    byte[] bytes = new byte[65536];
                    int bytesRead = _socket.ReceiveFrom(bytes, ref _roomServer);
                    int packetSize = 0;
                    int reply = 0;
                    byte[] data = new byte[bytesRead];
                    Array.Copy(bytes, data, bytesRead);
                    MemoryStream bufferReceived = new MemoryStream(data, 0, data.Length);
                    using (var reader = new BinaryReader(bufferReceived))
                    {
                        packetSize = (int)reader.ReadInt32() + 4;
                        reply = (int)reader.ReadByte();
                    }
                    if (!extraData && packetSize <= bytesRead)
                    {
                        if (data.Length > 0)
                        {
                            RoomserverReceivedPackets.Enqueue(data);
                            _queueNotifier.Set();
                        }
                    }
                    else
                    {
                        if (!extraData)
                        {
                            fullPacket = new MemoryStream(new byte[packetSize], 0, packetSize);
                            fullPacket.Write(data, 0, data.Length);
                            fullPacketSize = data.Length;
                            extraData = true;
                        }
                        else
                        {
                            if (fullPacketSize < fullPacket.Length)
                            {
                                int left = (int)fullPacket.Length - fullPacketSize;
                                fullPacket.Write(data, 0, (left < data.Length) ? left : data.Length);
                                fullPacketSize += (left < data.Length) ? left : data.Length;
                                if (fullPacketSize >= fullPacket.Length)
                                {
                                    extraData = false;
                                    RoomserverReceivedPackets.Enqueue(fullPacket.ToArray());
                                    _queueNotifier.Set();
                                    fullPacket.Close();
                                }
                            }
                        }
                    }
                }
                catch (SocketException soe)
                {
                    logger.Error("_ReceivePackets " + soe.ToString());
                }
                catch (Exception ex)
                {
                    logger.Error("_ReceivePackets " + ex.ToString());
                }
            }
        }
        private void _Response()
        {
            while (_socket.Connected)
            {
                _queueNotifier.WaitOne();
                while (!RoomserverReceivedPackets.IsEmpty)
                {
                    byte[] data = null;
                    if (RoomserverReceivedPackets.TryDequeue(out data))
                    {
                        MemoryStream bufferReceived = new MemoryStream(data, 0, data.Length);
                        using (var reader = new BinaryReader(bufferReceived))
                        {
                            int packetSize = (int)reader.ReadInt32();
                            byte reply = reader.ReadByte();
                            switch (reply)
                            {
                                case 0x01: // Login request
                                    break;
                                case 0x02: // Login accepted
                                    break;
                                case 0x03: // Enter room
                                    break;
                                case 0x04: // Members list
                                    break;
                                case 0x05: // Send Chat
                                    break;
                                case 0x06: // Receive Chat
                                    break;
                                case 0x07: // Receive Announcement
                                    break;
                                case 0x08: // Send Announcement
                                    break;
                                case 0x09: // Wrong password errors
                                    _status = Status.RoomError;
                                    break;
                                case 0x10: // Send Whisper
                                    break;
                                case 0x11: // Receive Whisper
                                    break;
                                case 0x12: // Leave Room
                                    break;
                                case 0x13: // Disconnect
                                    break;
                            }
                        }
                    }
                }
            }
        }
    }
}

在另一个类中我有:

public class SendPackets
{
    public byte[] Data { get; set; }
    public bool Delay { get; set; }
}
public enum Status
{
    Disconnected = 0,
    Connect,
    EnterRequest,
    RoomError,
    Connected
}

指导与TCP客户端,重新连接功能等

  • 使用Dictionary<int, ICommandHandler>代替switch语句
  • 切换到异步套接字
  • 阅读单一责任原则。
  • 使用。net命名约定

如果你想要一个更具体的答案,再回来问一个更具体的问题。

更新回复评论

代替:

switch (reply)
{
    case 0x01: // Login request
        break;
    case 0x02: // Login accepted

:

public interface ICommandHandler
{
    void Handle(byte[] packet);
}
public class LoginHandler : ICommandHandler
{
    public void Handle(byte[] packet) 
    {
        // handle packet data here
    }
}
var myHandler = new LoginHandler();
myServer.Register(1, myHandler);

然后在套接字类中:

public class MyServer
{
     Dictionary<int, ICommandHandler> _handlers;
     public void Register(int functionId, ICommandHandler handler)
     {
          _handlers[functionId] = handler;
     }
     private void _Response()
     {
           // .. alot of stuff ..
           _handlers[reply].Handle(memoryStream);
     }

注意,这个例子还远远没有完成,您可能想要发送一个上下文类,而不仅仅是内存流。

我应该在多大程度上依赖自己进入_socket。连接吗?

Connected属性为您提供了关于上次操作时套接字状态的信息,因此,如果套接字在您上次尝试读写之后改变了状态,那么Connected将给您错误的(旧的)状态。

根据文档,你应该做一个零长度的发送来让。net更新套接字状态。此Send操作的成功将告诉您套接字是否仍处于连接状态。

在这样的应用程序中,通常只需要一个线程进行通信。如果您的应用程序所做的只是聊天,那么整个程序可能是单线程的。如果有读/写操作,它会阻塞你的控制台,但你可以通过异步读/写调用或在阻塞操作上设置超时来解决这个问题。在我看来,你对穿线有点太热心了。我给新程序员的建议是,如果你不确定是否需要多线程,那就从单线程方法开始,当你发现有阻塞的地方或可以通过多线程提高性能的地方时,然后切换。不要一开始就做。

我看到你使用了ReceiveFrom,这是为无连接协议设计的。尝试使用基本接收。您应该指定想要接收的字节数,否则可能会导致接收缓冲区溢出。在c#中,这表现为SocketException,你必须通过WinSock 2 API来找出错误代码是什么。最好是指定接收的最大大小,并将接收放入循环中。

我将重复另一个回答者所说的-使用单一责任原则。设计一个只有一个作业的类。对于您的设计,我将从一个为您的应用程序在更高级别封装套接字通信的类开始。然后我将这个类派生为一个服务器类,也许还有一个客户端类。然后你可以在你的"RoomServer"answers"RoomClient"类中使用这些类。这种关注点的分离应该迫使您将每个对象建模为现实世界中的对象——会话者和侦听者,它使您考虑每个对象需要什么,并且与类的主要工作无关的无关成员变量需要从类中删除并找到更好的位置。