C# 异步套接字:这是线程安全且正确完成的

本文关键字:安全 线程 套接字 异步 | 更新日期: 2023-09-27 17:56:30

我需要实现一个TCP客户端应用程序。客户端和服务器相互发送消息。我想使这个程序具有足够的可扩展性,可以同时处理与多个服务器的连接。似乎异步套接字是实现这一目标的方法。我是 C# 的新手,所以我很确定我不知道我在这里做什么。我编写了一些类和一个简单的控制台程序来开始使用。最终,我想创建一个 Windows 窗体应用程序,但我想先从小而简单开始。客户端类在其自己的线程中运行。这一切都是线程安全且正确完成的吗?这是很多代码,我试图减少一些脂肪。

程序.cs

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading;
    namespace FastEyeClient
    {
        class Program
        {
            static void Main(string[] args)
            {
                Client client = new Client();
                client.ConnectEvent += new ConnectEventHandler(OnConnect);
                client.SetLiveStatusEvent += new SetLiveStatusEventHandler(OnSetLiveStatus);
                client.Connect("hostname", 1987);
                Thread.Sleep(1000);
                client.SetLiveStatus("hostname", true);
            }
            private static void OnConnect(object sender, ConnectEventArgs e)
            {
                Console.WriteLine(e.Message);
            }
            private static void OnSetLiveStatus(object sender, SetLiveStatusEventArgs e)
            {
                Console.WriteLine(e.Message);
            }
        }
    }

客户端.cs

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Net;
    using System.Net.Sockets;
    using System.Threading;
    namespace FastEyeClient
    {
        public delegate void ConnectEventHandler(object sender, ConnectEventArgs e);
        public delegate void SetLiveStatusEventHandler(object sender, SetLiveStatusEventArgs e);
        public class Client : IDisposable
        {
            public event ConnectEventHandler ConnectEvent;
            public event SetLiveStatusEventHandler SetLiveStatusEvent;
            ServerManager m_Manager;
            EventWaitHandle m_WaitHandle;
            readonly object m_Locker;
            Queue<Event> m_Tasks;
            Thread m_Thread;
            public Client()
            {
                m_Manager = new ServerManager(this);
                m_WaitHandle = new AutoResetEvent(false);
                m_Locker = new object();
                m_Tasks = new Queue<Event>();
                m_Thread = new Thread(Run);
                m_Thread.Start();
            }
            public void EnqueueTask(Event task)
            {
                lock (m_Locker)
                {
                    m_Tasks.Enqueue(task);
                }
                m_WaitHandle.Set();
            }
            public void Dispose()
            {
                EnqueueTask(null);
                m_Thread.Join();
                m_WaitHandle.Close();
            }
            private void Run()
            {
                while (true)
                {
                    Event task = null;
                    lock (m_Locker)
                    {
                        if (m_Tasks.Count > 0)
                        {
                            task = m_Tasks.Dequeue();
                            if (task == null)
                            {
                                return;
                            }
                        }
                    }
                    if (task != null)
                    {
                        task.DoTask(m_Manager);
                    }
                    else
                    {
                        m_WaitHandle.WaitOne();
                    }
                }
            }
            public void Connect(string hostname, int port)
            {
                EnqueueTask(new ConnectEvent(hostname, port));
            }
            public void SetLiveStatus(string hostname, bool status)
            {
                EnqueueTask(new SetLiveEvent(hostname, status));
            }
            public void OnConnect(bool isConnected, string message)
            {
                if (ConnectEvent != null)
                {
                    ConnectEvent(this, new ConnectEventArgs(isConnected, message));
                }
            }
            public void OnSetLiveStatus(string hostname, string message)
            {
                if (SetLiveStatusEvent != null)
                {
                    SetLiveStatusEvent(this, new SetLiveStatusEventArgs(hostname, message));
                }
            }
        }
    }

服务器.cs

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Net;
    using System.Net.Sockets;
    namespace FastEyeClient
    {
        public class Server
        {
            private ServerManager m_Manager;
            private string m_Hostname;
            private bool m_IsLive;
            private class StateObject
            {
                public Socket AsyncSocket = null;
                public const int BufferSize = 1024;
                public byte[] Buffer = new byte[BufferSize];
                public StringBuilder Builder = new StringBuilder();
            }
            public Server(ServerManager manager, Socket socket)
            {
                try
                {
                    m_Manager = manager;
                    IPEndPoint endPoint = (IPEndPoint)socket.RemoteEndPoint;
                    IPAddress ipAddress = endPoint.Address;
                    IPHostEntry hostEntry = Dns.GetHostEntry(ipAddress);
                    Hostname = hostEntry.HostName;
                    IsLive = false;
                    StateObject state = new StateObject();
                    state.AsyncSocket = socket;
                    socket.BeginReceive(state.Buffer, 0, StateObject.BufferSize, 0, new AsyncCallback(ReceiveCallback), state);
                }
                catch (Exception)
                {
                }
            }
            public string Hostname
            {
                get
                {
                    return m_Hostname;
                }
                set
                {
                    m_Hostname = value;
                }
            }
            public bool IsLive
            {
                get
                {
                    return m_IsLive;
                }
                set
                {
                    m_IsLive = value;
                }
            }
            private void ReceiveCallback(IAsyncResult result)
            {
                try
                {
                    StateObject state = (StateObject)result.AsyncState;
                    Socket socket = state.AsyncSocket;
                    int read = socket.EndReceive(result);
                    if (read > 0)
                    {
                        state.Builder.Append(Encoding.ASCII.GetString(state.Buffer, 0, read));
                        if (state.Builder.Length > 1)
                        {
                            string messages = state.Builder.ToString();
                            ParseMessages(messages);
                        }
                    }
                    StateObject newState = new StateObject();
                    newState.AsyncSocket = socket;
                    socket.BeginReceive(newState.Buffer, 0, StateObject.BufferSize, 0, new AsyncCallback(ReceiveCallback), newState);
                }
                catch (Exception)
                {
                }
            }
            private void ParseMessages(string messages)
            {
                string[] messagesArray = messages.Split(''n');
                foreach (string message in messagesArray)
                {
                    string[] tokens = message.Split(',');
                    if (tokens[0].Contains("@"))
                    {
                        ParseServerMessage(tokens);
                    }
                }
            }
            private void ParseServerMessage(string[] tokens)
            {
                tokens[0].Remove(0, 1);
                if (tokens[0] == "4")
                {
                    bool status;
                    if (tokens[1] == "0")
                    {
                        status = false;
                        m_Manager.SetLiveStatus(m_Hostname, status);
                    }
                    else if (tokens[1] == "1")
                    {
                        status = true;
                        m_Manager.SetLiveStatus(m_Hostname, status);
                    }
                }
            }
        }
    }

服务器管理器.cs

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Net;
    using System.Net.Sockets;
    namespace FastEyeClient
    {
        public class ServerManager
        {
            private Client m_Client;
            private Dictionary<string, Server> m_Servers;
            private object m_Locker;
            public ServerManager(Client client)
            {
                m_Client = client;
                m_Servers = new Dictionary<string, Server>();
                m_Locker = new object();
            }
            public void AddServer(string hostname, int port)
            {
                try
                {
                    IPAddress[] IPs = Dns.GetHostAddresses(hostname);
                    Socket socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
                    socket.BeginConnect(IPs, port, new AsyncCallback(ConnectCallback), socket);
                }
                catch (Exception)
                {
                    bool isConnected = false;
                    string message = "Could not connect to server.";
                    m_Client.OnConnect(isConnected, message);
                }
            }
            private void ConnectCallback(IAsyncResult ar)
            {
                bool isConnected;
                string message;
                try
                {
                    Socket socket = (Socket)ar.AsyncState;
                    socket.EndConnect(ar);
                    IPEndPoint endPoint = (IPEndPoint)socket.RemoteEndPoint;
                    IPAddress ipAddress = endPoint.Address;
                    IPHostEntry hostEntry = Dns.GetHostEntry(ipAddress);
                    string hostname = hostEntry.HostName;
                    lock (m_Servers)
                    {
                        if (m_Servers.ContainsKey(hostname))
                        {
                            isConnected = false;
                            message = "Client is already connected to server";
                        }
                        else
                        {
                            m_Servers.Add(hostname, new Server(this, socket));
                            isConnected = true;
                            message = "Successfully connected.";
                        }
                    }
                    m_Client.OnConnect(isConnected, message);
                }
                catch (Exception)
                {
                    isConnected = false;
                    message = "Could not connect to server.";
                    m_Client.OnConnect(isConnected, message);
                }
            }
            public void SetLiveStatus(string hostname, bool newStatus)
            {
                string message;
                lock (m_Locker)
                {
                    if (m_Servers.ContainsKey(hostname))
                    {
                        if (m_Servers[hostname].IsLive == newStatus)
                        {
                            message = "Server is already set to this status.";
                        }
                        else
                        {
                            m_Servers[hostname].IsLive = newStatus;
                            message = "Successfully set new status.";
                        }
                    }
                    else
                    {
                        message = "Server not found.";
                    }
                }
                m_Client.OnSetLiveStatus(hostname, message);
            }
        }
    }

C# 异步套接字:这是线程安全且正确完成的

  1. 它运行吗?
  2. 它会引发异常吗?

    尝试在多个线程中运行服务器代码的陷阱:

避免尝试在不同的线程中操作、读取或写入套接字。 让一个线程接受来自服务器套接字的连接,并生成一个线程来处理事务。 如果你一次有太多线程,你将有 1 个线程处理多个套接字。

不,它不是线程安全的。

订阅

者可以在检查和调用之前取消订阅:

if (ConnectEvent != null)
{
    ConnectEvent(this, new ConnectEventArgs(isConnected, message));
}

将事件定义为:

public event ConnectEventHandler ConnectEvent = delegate{};

并删除事件以获取线程安全性。


我会将运行循环减少到:

private void Run()
{
    while (true)
    {
        m_WaitHandle.WaitOne();
        Event task = null;
        lock (m_Locker)
        {
            if (m_Tasks.Count == 0)
                            {
                m_WaitHandle.Reset();
                continue;
            }
            task = m_Tasks.Dequeue();
        }
        task.DoTask(m_Manager);
    }
}
  1. 循环将继续运行,直到事件重置。
  2. 确保队列中没有插入空项,而不是检查空值。

您可以通过使用 BlockingCollection 而不是AutoResetEvent和普通旧Queue的组合来简化 Client 类中的生产者-消费者模式。

EnqueueTask方法如下所示:

public void EnqueueTask(Event task)
{
  m_Queue.Add(task);
}

Run方法如下所示:

public void Run()
{
  while (true)
  {
    Event task = m_Queue.Take();
    if (task == null)
    {
      return;
    }
    task.DoTask();
  }
}