C# 中的高性能 TCP 服务器
本文关键字:TCP 服务器 高性能 | 更新日期: 2023-09-27 17:56:30
我是一个经验丰富的C#开发人员,但到目前为止我还没有开发TCP服务器应用程序。现在,我必须开发一个高度可扩展和高性能的服务器,可以处理至少5-10千个并发连接:通过GPRS从GPS设备获取原始字节数据。
常见的通信过程应如下所示:
- GPS 设备启动与我的服务器的连接
- 如果我想获取数据,我的服务器会回答
- 设备发送全球定位系统数据
- 我的服务器向设备发送有关获取它的报告(SG 如校验和)
- 从GPS,报告获取新数据,这种情况一次又一次地发生
- 稍后 GPS 设备关闭连接
所以,在我的服务器中,我需要
- 跟踪已连接/活动客户端
- 从服务器端关闭任何客户端
- 当设备关闭连接时捕获事件
- 获取字节数据
- 向客户端发送数据
我开始在互联网上阅读这个话题,但这对我来说似乎是一场噩梦。有很多方法,但我找不到哪个是最好的。
异步套接字方法对我来说似乎是最好的,但是以这种异步风格编写代码很糟糕,而且不容易调试。
所以我的问题是:您认为哪种方法在 C# 中实现高性能 TCP 服务器的最佳方法?你知道有什么好的开源组件可以做到这一点吗?(我尝试了几个,但我找不到一个好的。
它必须是异步的,没有办法解决这个问题。高性能和可扩展性不会与每个插槽一个线程混合使用。你可以看看 StackExchange 自己在做什么,看看异步 Redis await BookSleeve,它利用了下一个 C# 版本的 CTP 功能(所以在边缘并且可能会发生变化,但它很酷)。为了更加前沿,解决方案围绕利用 SocketAsyncEventArgs 类而发展,该类通过消除与"经典"C# 异步处理相关的异步处理程序的频繁分配,使事情更进一步:
SocketAsyncEventArgs 类是一部分 的一组增强功能 System.Net.Sockets.Socket 类 提供替代异步 可以使用的模式 专用高性能插座 应用。这个类是 专为网络设计 要求较高的服务器应用程序 性能。应用程序可以使用 增强的异步模式 专门或仅在目标热门 区域(例如,接收时 大量数据)。
长话短说:学习异步或死亡尝试...
顺便说一句,如果你问为什么异步,那么请阅读这篇文章中链接的三篇文章:高性能Windows程序。最终的答案是:底层操作系统设计需要它。
正如 Remus 上面所说,你必须使用异步来保持高性能。那是开始.../结束....NET 中的方法。
在套接字的后台,这些方法利用IO完成端口,这似乎是在Windows操作系统上处理许多套接字的最高性能方式。
正如 Jim 所说,TcpClient 类可以在这里提供帮助,并且非常易于使用。下面是使用 TcpListener 侦听传入连接和 TcpClient 处理它们的示例,初始 BeginAccept 和 BeginRead 调用是异步的。
此示例确实假设在套接字上使用基于消息的协议,并且省略了该协议,除了每个传输的前 4 个字节是长度之外,但这允许您在流上使用同步读取来获取已缓冲的其余数据。
这是代码:
class ClientContext
{
public TcpClient Client;
public Stream Stream;
public byte[] Buffer = new byte[4];
public MemoryStream Message = new MemoryStream();
}
class Program
{
static void OnMessageReceived(ClientContext context)
{
// process the message here
}
static void OnClientRead(IAsyncResult ar)
{
ClientContext context = ar.AsyncState as ClientContext;
if (context == null)
return;
try
{
int read = context.Stream.EndRead(ar);
context.Message.Write(context.Buffer, 0, read);
int length = BitConverter.ToInt32(context.Buffer, 0);
byte[] buffer = new byte[1024];
while (length > 0)
{
read = context.Stream.Read(buffer, 0, Math.Min(buffer.Length, length));
context.Message.Write(buffer, 0, read);
length -= read;
}
OnMessageReceived(context);
}
catch (System.Exception)
{
context.Client.Close();
context.Stream.Dispose();
context.Message.Dispose();
context = null;
}
finally
{
if (context != null)
context.Stream.BeginRead(context.Buffer, 0, context.Buffer.Length, OnClientRead, context);
}
}
static void OnClientAccepted(IAsyncResult ar)
{
TcpListener listener = ar.AsyncState as TcpListener;
if (listener == null)
return;
try
{
ClientContext context = new ClientContext();
context.Client = listener.EndAcceptTcpClient(ar);
context.Stream = context.Client.GetStream();
context.Stream.BeginRead(context.Buffer, 0, context.Buffer.Length, OnClientRead, context);
}
finally
{
listener.BeginAcceptTcpClient(OnClientAccepted, listener);
}
}
static void Main(string[] args)
{
TcpListener listener = new TcpListener(new IPEndPoint(IPAddress.Any, 20000));
listener.Start();
listener.BeginAcceptTcpClient(OnClientAccepted, listener);
Console.Write("Press enter to exit...");
Console.ReadLine();
listener.Stop();
}
}
它演示了如何处理异步调用,但它需要添加错误处理,以确保 TcpListener 始终接受新连接,并在客户端意外断开连接时进行更多错误处理。此外,似乎确实存在一些情况,并非所有数据都一次性到达,也需要处理。
您可以使用 TcpClient 类执行此操作,尽管说实话,我不知道您是否可以有 10,000 个打开的套接字。这是相当多的。但我经常使用 TcpClient
来处理数十个并发套接字。异步模型实际上非常好用。
你最大的问题不是让TcpClient
工作。有 10,000 个并发连接,我认为带宽和可扩展性将成为问题。我什至不知道一台机器是否可以处理所有这些流量。我想这取决于数据包的大小以及它们进入的频率。但是,在承诺在一台计算机上实现所有这些内容之前,您最好先进行一些粗略的估计。
我想你也在寻找UDP技术。对于 10k 客户端,它很快,但问题是您必须为收到消息的每条消息实现 ACKnowledgement。在 UDP 中,您不需要为每个客户端打开套接字,但需要在 x 秒后实现心跳/ping 机制以检查哪个客户端已连接。
你可以使用我做的TCP CSharpServer,实现起来非常简单,只需在你的一个类上实现IClientRequest。
using System;
using System.Collections.Generic;
using System.Linq;
namespace cSharpServer
{
public interface IClientRequest
{
/// <summary>
/// this needs to be set, otherwise the server will not beable to handle the request.
/// </summary>
byte IdType { get; set; } // This is used for Execution.
/// <summary>
/// handle the process by the client.
/// </summary>
/// <param name="data"></param>
/// <param name="client"></param>
/// <returns></returns>
byte[] Process(BinaryBuffer data, Client client);
}
}
BinaryBuffer允许您非常轻松地读取发送到服务器的数据。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading.Tasks;
namespace cSharpServer
{
public class BinaryBuffer
{
private const string Str0001 = "You are at the End of File!";
private const string Str0002 = "You are Not Reading from the Buffer!";
private const string Str0003 = "You are Currenlty Writing to the Buffer!";
private const string Str0004 = "You are Currenlty Reading from the Buffer!";
private const string Str0005 = "You are Not Writing to the Buffer!";
private const string Str0006 = "You are trying to Reverse Seek, Unable to add a Negative value!";
private bool _inRead;
private bool _inWrite;
private List<byte> _newBytes;
private int _pointer;
public byte[] ByteBuffer;
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public override string ToString()
{
return Helper.DefaultEncoding.GetString(ByteBuffer, 0, ByteBuffer.Length);
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public BinaryBuffer(string data)
: this(Helper.DefaultEncoding.GetBytes(data))
{
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public BinaryBuffer()
{
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public BinaryBuffer(byte[] data)
: this(ref data)
{
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public BinaryBuffer(ref byte[] data)
{
ByteBuffer = data;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void IncrementPointer(int add)
{
if (add < 0)
{
throw new Exception(Str0006);
}
_pointer += add;
if (EofBuffer())
{
throw new Exception(Str0001);
}
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public int GetPointer()
{
return _pointer;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static string GetString(ref byte[] buffer)
{
return Helper.DefaultEncoding.GetString(buffer, 0, buffer.Length);
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static string GetString(byte[] buffer)
{
return GetString(ref buffer);
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void BeginWrite()
{
if (_inRead)
{
throw new Exception(Str0004);
}
_inWrite = true;
_newBytes = new List<byte>();
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void Write(float value)
{
if (!_inWrite)
{
throw new Exception(Str0005);
}
_newBytes.AddRange(BitConverter.GetBytes(value));
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void Write(byte value)
{
if (!_inWrite)
{
throw new Exception(Str0005);
}
_newBytes.Add(value);
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void Write(int value)
{
if (!_inWrite)
{
throw new Exception(Str0005);
}
_newBytes.AddRange(BitConverter.GetBytes(value));
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void Write(long value)
{
if (!_inWrite)
{
throw new Exception(Str0005);
}
byte[] byteArray = new byte[8];
unsafe
{
fixed (byte* bytePointer = byteArray)
{
*((long*)bytePointer) = value;
}
}
_newBytes.AddRange(byteArray);
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public int UncommitedLength()
{
return _newBytes == null ? 0 : _newBytes.Count;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void WriteField(string value)
{
Write(value.Length);
Write(value);
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void Write(string value)
{
if (!_inWrite)
{
throw new Exception(Str0005);
}
byte[] byteArray = Helper.DefaultEncoding.GetBytes(value);
_newBytes.AddRange(byteArray);
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void Write(decimal value)
{
if (!_inWrite)
{
throw new Exception(Str0005);
}
int[] intArray = decimal.GetBits(value);
Write(intArray[0]);
Write(intArray[1]);
Write(intArray[2]);
Write(intArray[3]);
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void SetInt(int value, int pos)
{
byte[] byteInt = BitConverter.GetBytes(value);
for (int i = 0; i < byteInt.Length; i++)
{
_newBytes[pos + i] = byteInt[i];
}
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void SetLong(long value, int pos)
{
byte[] byteInt = BitConverter.GetBytes(value);
for (int i = 0; i < byteInt.Length; i++)
{
_newBytes[pos + i] = byteInt[i];
}
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void Write(byte[] value)
{
Write(ref value);
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void Write(ref byte[] value)
{
if (!_inWrite)
{
throw new Exception(Str0005);
}
_newBytes.AddRange(value);
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void EndWrite()
{
if (ByteBuffer != null)
{
_newBytes.InsertRange(0, ByteBuffer);
}
ByteBuffer = _newBytes.ToArray();
_newBytes = null;
_inWrite = false;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void EndRead()
{
_inRead = false;
_pointer = 0;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void BeginRead()
{
if (_inWrite)
{
throw new Exception(Str0003);
}
_inRead = true;
_pointer = 0;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public byte ReadByte()
{
if (!_inRead)
{
throw new Exception(Str0002);
}
if (EofBuffer())
{
throw new Exception(Str0001);
}
return ByteBuffer[_pointer++];
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public int ReadInt()
{
if (!_inRead)
{
throw new Exception(Str0002);
}
if (EofBuffer(4))
{
throw new Exception(Str0001);
}
int startPointer = _pointer;
_pointer += 4;
return BitConverter.ToInt32(ByteBuffer, startPointer);
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public float[] ReadFloatArray()
{
float[] dataFloats = new float[ReadInt()];
for (int i = 0; i < dataFloats.Length; i++)
{
dataFloats[i] = ReadFloat();
}
return dataFloats;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public float ReadFloat()
{
if (!_inRead)
{
throw new Exception(Str0002);
}
if (EofBuffer(sizeof(float)))
{
throw new Exception(Str0001);
}
int startPointer = _pointer;
_pointer += sizeof(float);
return BitConverter.ToSingle(ByteBuffer, startPointer);
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public decimal ReadDecimal()
{
if (!_inRead)
{
throw new Exception(Str0002);
}
if (EofBuffer(16))
{
throw new Exception(Str0001);
}
return new decimal(new[] { ReadInt(),
ReadInt(),
ReadInt(),
ReadInt()
});
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public long ReadLong()
{
if (!_inRead)
{
throw new Exception(Str0002);
}
if (EofBuffer(8))
{
throw new Exception(Str0001);
}
int startPointer = _pointer;
_pointer += 8;
return BitConverter.ToInt64(ByteBuffer, startPointer);
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public string ReadString(int size)
{
return Helper.DefaultEncoding.GetString(ReadByteArray(size), 0, size);
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public byte[] ReadByteArray(int size)
{
if (!_inRead)
{
throw new Exception(Str0002);
}
if (EofBuffer(size))
{
throw new Exception(Str0001);
}
byte[] newBuffer = new byte[size];
Array.Copy(ByteBuffer, _pointer, newBuffer, 0, size);
_pointer += size;
return newBuffer;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public bool EofBuffer(int over = 1)
{
return ByteBuffer == null || ((_pointer + over) > ByteBuffer.Length);
}
}
}
完整项目在 GitHub CSharpServer 上