C# 异步套接字:这是线程安全且正确完成的
本文关键字:安全 线程 套接字 异步 | 更新日期: 2023-09-27 17:56:30
我需要实现一个TCP客户端应用程序。客户端和服务器相互发送消息。我想使这个程序具有足够的可扩展性,可以同时处理与多个服务器的连接。似乎异步套接字是实现这一目标的方法。我是 C# 的新手,所以我很确定我不知道我在这里做什么。我编写了一些类和一个简单的控制台程序来开始使用。最终,我想创建一个 Windows 窗体应用程序,但我想先从小而简单开始。客户端类在其自己的线程中运行。这一切都是线程安全且正确完成的吗?这是很多代码,我试图减少一些脂肪。
程序.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
namespace FastEyeClient
{
class Program
{
static void Main(string[] args)
{
Client client = new Client();
client.ConnectEvent += new ConnectEventHandler(OnConnect);
client.SetLiveStatusEvent += new SetLiveStatusEventHandler(OnSetLiveStatus);
client.Connect("hostname", 1987);
Thread.Sleep(1000);
client.SetLiveStatus("hostname", true);
}
private static void OnConnect(object sender, ConnectEventArgs e)
{
Console.WriteLine(e.Message);
}
private static void OnSetLiveStatus(object sender, SetLiveStatusEventArgs e)
{
Console.WriteLine(e.Message);
}
}
}
客户端.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Net;
using System.Net.Sockets;
using System.Threading;
namespace FastEyeClient
{
public delegate void ConnectEventHandler(object sender, ConnectEventArgs e);
public delegate void SetLiveStatusEventHandler(object sender, SetLiveStatusEventArgs e);
public class Client : IDisposable
{
public event ConnectEventHandler ConnectEvent;
public event SetLiveStatusEventHandler SetLiveStatusEvent;
ServerManager m_Manager;
EventWaitHandle m_WaitHandle;
readonly object m_Locker;
Queue<Event> m_Tasks;
Thread m_Thread;
public Client()
{
m_Manager = new ServerManager(this);
m_WaitHandle = new AutoResetEvent(false);
m_Locker = new object();
m_Tasks = new Queue<Event>();
m_Thread = new Thread(Run);
m_Thread.Start();
}
public void EnqueueTask(Event task)
{
lock (m_Locker)
{
m_Tasks.Enqueue(task);
}
m_WaitHandle.Set();
}
public void Dispose()
{
EnqueueTask(null);
m_Thread.Join();
m_WaitHandle.Close();
}
private void Run()
{
while (true)
{
Event task = null;
lock (m_Locker)
{
if (m_Tasks.Count > 0)
{
task = m_Tasks.Dequeue();
if (task == null)
{
return;
}
}
}
if (task != null)
{
task.DoTask(m_Manager);
}
else
{
m_WaitHandle.WaitOne();
}
}
}
public void Connect(string hostname, int port)
{
EnqueueTask(new ConnectEvent(hostname, port));
}
public void SetLiveStatus(string hostname, bool status)
{
EnqueueTask(new SetLiveEvent(hostname, status));
}
public void OnConnect(bool isConnected, string message)
{
if (ConnectEvent != null)
{
ConnectEvent(this, new ConnectEventArgs(isConnected, message));
}
}
public void OnSetLiveStatus(string hostname, string message)
{
if (SetLiveStatusEvent != null)
{
SetLiveStatusEvent(this, new SetLiveStatusEventArgs(hostname, message));
}
}
}
}
服务器.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Net;
using System.Net.Sockets;
namespace FastEyeClient
{
public class Server
{
private ServerManager m_Manager;
private string m_Hostname;
private bool m_IsLive;
private class StateObject
{
public Socket AsyncSocket = null;
public const int BufferSize = 1024;
public byte[] Buffer = new byte[BufferSize];
public StringBuilder Builder = new StringBuilder();
}
public Server(ServerManager manager, Socket socket)
{
try
{
m_Manager = manager;
IPEndPoint endPoint = (IPEndPoint)socket.RemoteEndPoint;
IPAddress ipAddress = endPoint.Address;
IPHostEntry hostEntry = Dns.GetHostEntry(ipAddress);
Hostname = hostEntry.HostName;
IsLive = false;
StateObject state = new StateObject();
state.AsyncSocket = socket;
socket.BeginReceive(state.Buffer, 0, StateObject.BufferSize, 0, new AsyncCallback(ReceiveCallback), state);
}
catch (Exception)
{
}
}
public string Hostname
{
get
{
return m_Hostname;
}
set
{
m_Hostname = value;
}
}
public bool IsLive
{
get
{
return m_IsLive;
}
set
{
m_IsLive = value;
}
}
private void ReceiveCallback(IAsyncResult result)
{
try
{
StateObject state = (StateObject)result.AsyncState;
Socket socket = state.AsyncSocket;
int read = socket.EndReceive(result);
if (read > 0)
{
state.Builder.Append(Encoding.ASCII.GetString(state.Buffer, 0, read));
if (state.Builder.Length > 1)
{
string messages = state.Builder.ToString();
ParseMessages(messages);
}
}
StateObject newState = new StateObject();
newState.AsyncSocket = socket;
socket.BeginReceive(newState.Buffer, 0, StateObject.BufferSize, 0, new AsyncCallback(ReceiveCallback), newState);
}
catch (Exception)
{
}
}
private void ParseMessages(string messages)
{
string[] messagesArray = messages.Split(''n');
foreach (string message in messagesArray)
{
string[] tokens = message.Split(',');
if (tokens[0].Contains("@"))
{
ParseServerMessage(tokens);
}
}
}
private void ParseServerMessage(string[] tokens)
{
tokens[0].Remove(0, 1);
if (tokens[0] == "4")
{
bool status;
if (tokens[1] == "0")
{
status = false;
m_Manager.SetLiveStatus(m_Hostname, status);
}
else if (tokens[1] == "1")
{
status = true;
m_Manager.SetLiveStatus(m_Hostname, status);
}
}
}
}
}
服务器管理器.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Net;
using System.Net.Sockets;
namespace FastEyeClient
{
public class ServerManager
{
private Client m_Client;
private Dictionary<string, Server> m_Servers;
private object m_Locker;
public ServerManager(Client client)
{
m_Client = client;
m_Servers = new Dictionary<string, Server>();
m_Locker = new object();
}
public void AddServer(string hostname, int port)
{
try
{
IPAddress[] IPs = Dns.GetHostAddresses(hostname);
Socket socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
socket.BeginConnect(IPs, port, new AsyncCallback(ConnectCallback), socket);
}
catch (Exception)
{
bool isConnected = false;
string message = "Could not connect to server.";
m_Client.OnConnect(isConnected, message);
}
}
private void ConnectCallback(IAsyncResult ar)
{
bool isConnected;
string message;
try
{
Socket socket = (Socket)ar.AsyncState;
socket.EndConnect(ar);
IPEndPoint endPoint = (IPEndPoint)socket.RemoteEndPoint;
IPAddress ipAddress = endPoint.Address;
IPHostEntry hostEntry = Dns.GetHostEntry(ipAddress);
string hostname = hostEntry.HostName;
lock (m_Servers)
{
if (m_Servers.ContainsKey(hostname))
{
isConnected = false;
message = "Client is already connected to server";
}
else
{
m_Servers.Add(hostname, new Server(this, socket));
isConnected = true;
message = "Successfully connected.";
}
}
m_Client.OnConnect(isConnected, message);
}
catch (Exception)
{
isConnected = false;
message = "Could not connect to server.";
m_Client.OnConnect(isConnected, message);
}
}
public void SetLiveStatus(string hostname, bool newStatus)
{
string message;
lock (m_Locker)
{
if (m_Servers.ContainsKey(hostname))
{
if (m_Servers[hostname].IsLive == newStatus)
{
message = "Server is already set to this status.";
}
else
{
m_Servers[hostname].IsLive = newStatus;
message = "Successfully set new status.";
}
}
else
{
message = "Server not found.";
}
}
m_Client.OnSetLiveStatus(hostname, message);
}
}
}
- 它运行吗?
-
它会引发异常吗?
尝试在多个线程中运行服务器代码的陷阱:
避免尝试在不同的线程中操作、读取或写入套接字。 让一个线程接受来自服务器套接字的连接,并生成一个线程来处理事务。 如果你一次有太多线程,你将有 1 个线程处理多个套接字。
不,它不是线程安全的。
订阅者可以在检查和调用之前取消订阅:
if (ConnectEvent != null)
{
ConnectEvent(this, new ConnectEventArgs(isConnected, message));
}
将事件定义为:
public event ConnectEventHandler ConnectEvent = delegate{};
并删除事件以获取线程安全性。
我会将运行循环减少到:
private void Run()
{
while (true)
{
m_WaitHandle.WaitOne();
Event task = null;
lock (m_Locker)
{
if (m_Tasks.Count == 0)
{
m_WaitHandle.Reset();
continue;
}
task = m_Tasks.Dequeue();
}
task.DoTask(m_Manager);
}
}
- 循环将继续运行,直到事件重置。
- 确保队列中没有插入空项,而不是检查空值。
您可以通过使用 BlockingCollection
而不是AutoResetEvent
和普通旧Queue
的组合来简化 Client
类中的生产者-消费者模式。
EnqueueTask
方法如下所示:
public void EnqueueTask(Event task)
{
m_Queue.Add(task);
}
Run
方法如下所示:
public void Run()
{
while (true)
{
Event task = m_Queue.Take();
if (task == null)
{
return;
}
task.DoTask();
}
}