C# 中的高性能 TCP 服务器

本文关键字:TCP 服务器 高性能 | 更新日期: 2023-09-27 17:56:30

我是一个经验丰富的C#开发人员,但到目前为止我还没有开发TCP服务器应用程序。现在,我必须开发一个高度可扩展和高性能的服务器,可以处理至少5-10千个并发连接:通过GPRS从GPS设备获取原始字节数据。

常见的通信过程应如下所示:

  • GPS 设备启动与我的服务器的连接
  • 如果我想获取数据,我的服务器会回答
  • 设备发送全球定位系统数据
  • 我的服务器向设备发送有关获取它的报告(SG 如校验和)
  • 从GPS,报告获取新数据,这种情况一次又一次地发生
  • 稍后 GPS 设备关闭连接

所以,在我的服务器中,我需要

  • 跟踪已连接/活动客户端
  • 从服务器端关闭任何客户端
  • 当设备关闭连接时捕获事件
  • 获取字节数据
  • 向客户端发送数据

我开始在互联网上阅读这个话题,但这对我来说似乎是一场噩梦。有很多方法,但我找不到哪个是最好的。

异步

套接字方法对我来说似乎是最好的,但是以这种异步风格编写代码很糟糕,而且不容易调试。

所以我的问题是:您认为哪种方法在 C# 中实现高性能 TCP 服务器的最佳方法?你知道有什么好的开源组件可以做到这一点吗?(我尝试了几个,但我找不到一个好的。

C# 中的高性能 TCP 服务器

它必须是异步的,没有办法解决这个问题。高性能和可扩展性不会与每个插槽一个线程混合使用。你可以看看 StackExchange 自己在做什么,看看异步 Redis await BookSleeve,它利用了下一个 C# 版本的 CTP 功能(所以在边缘并且可能会发生变化,但它很酷)。为了更加前沿,解决方案围绕利用 SocketAsyncEventArgs 类而发展,该类通过消除与"经典"C# 异步处理相关的异步处理程序的频繁分配,使事情更进一步:

SocketAsyncEventArgs 类是一部分 的一组增强功能 System.Net.Sockets.Socket 类 提供替代异步 可以使用的模式 专用高性能插座 应用。这个类是 专为网络设计 要求较高的服务器应用程序 性能。应用程序可以使用 增强的异步模式 专门或仅在目标热门 区域(例如,接收时 大量数据)。

长话短说:学习异步或死亡尝试...

顺便说一句,如果你问为什么异步,那么请阅读这篇文章中链接的三篇文章:高性能Windows程序。最终的答案是:底层操作系统设计需要它。

正如 Remus 上面所说,你必须使用异步来保持高性能。那是开始.../结束....NET 中的方法。

在套接字

的后台,这些方法利用IO完成端口,这似乎是在Windows操作系统上处理许多套接字的最高性能方式。

正如 Jim 所说,TcpClient 类可以在这里提供帮助,并且非常易于使用。下面是使用 TcpListener 侦听传入连接和 TcpClient 处理它们的示例,初始 BeginAccept 和 BeginRead 调用是异步的。

此示例确实假设在套接字上使用基于消息的协议,并且省略了该协议,除了每个传输的前 4 个字节是长度之外,但这允许您在流上使用同步读取来获取已缓冲的其余数据。

这是代码:

class ClientContext
{
    public TcpClient Client;
    public Stream Stream;
    public byte[] Buffer = new byte[4];
    public MemoryStream Message = new MemoryStream();
}
class Program
{
    static void OnMessageReceived(ClientContext context)
    {
        // process the message here
    }
    static void OnClientRead(IAsyncResult ar)
    {
        ClientContext context = ar.AsyncState as ClientContext;
        if (context == null)
            return;
        try
        {
            int read = context.Stream.EndRead(ar);
            context.Message.Write(context.Buffer, 0, read);
            int length = BitConverter.ToInt32(context.Buffer, 0);
            byte[] buffer = new byte[1024];
            while (length > 0)
            {
                read = context.Stream.Read(buffer, 0, Math.Min(buffer.Length, length));
                context.Message.Write(buffer, 0, read);
                length -= read;
            }
            OnMessageReceived(context);
        }
        catch (System.Exception)
        {
            context.Client.Close();
            context.Stream.Dispose();
            context.Message.Dispose();
            context = null;
        }
        finally
        {
            if (context != null)
                context.Stream.BeginRead(context.Buffer, 0, context.Buffer.Length, OnClientRead, context);
        }
    }
    static void OnClientAccepted(IAsyncResult ar)
    {
        TcpListener listener = ar.AsyncState as TcpListener;
        if (listener == null)
            return;
        try
        {
            ClientContext context = new ClientContext();
            context.Client = listener.EndAcceptTcpClient(ar);
            context.Stream = context.Client.GetStream();
            context.Stream.BeginRead(context.Buffer, 0, context.Buffer.Length, OnClientRead, context);
        }
        finally
        {
            listener.BeginAcceptTcpClient(OnClientAccepted, listener);
        }
    }
    static void Main(string[] args)
    {
        TcpListener listener = new TcpListener(new IPEndPoint(IPAddress.Any, 20000));
        listener.Start();
        listener.BeginAcceptTcpClient(OnClientAccepted, listener);
        Console.Write("Press enter to exit...");
        Console.ReadLine();
        listener.Stop();
    }
}

它演示了如何处理异步调用,但它需要添加错误处理,以确保 TcpListener 始终接受新连接,并在客户端意外断开连接时进行更多错误处理。此外,似乎确实存在一些情况,并非所有数据都一次性到达,也需要处理。

您可以使用 TcpClient 类执行此操作,尽管说实话,我不知道您是否可以有 10,000 个打开的套接字。这是相当多的。但我经常使用 TcpClient 来处理数十个并发套接字。异步模型实际上非常好用。

你最大的问题不是让TcpClient工作。有 10,000 个并发连接,我认为带宽和可扩展性将成为问题。我什至不知道一台机器是否可以处理所有这些流量。我想这取决于数据包的大小以及它们进入的频率。但是,在承诺在一台计算机上实现所有这些内容之前,您最好先进行一些粗略的估计。

我想你也在寻找UDP技术。对于 10k 客户端,它很快,但问题是您必须为收到消息的每条消息实现 ACKnowledgement。在 UDP 中,您不需要为每个客户端打开套接字,但需要在 x 秒后实现心跳/ping 机制以检查哪个客户端已连接。

你可以使用我做的TCP CSharpServer,实现起来非常简单,只需在你的一个类上实现IClientRequest。

using System;
using System.Collections.Generic;
using System.Linq;
namespace cSharpServer
{
    public interface IClientRequest
    {        
        /// <summary>
        /// this needs to be set, otherwise the server will not beable to handle the request.
        /// </summary>
        byte IdType { get; set; } // This is used for Execution.
        /// <summary>
        /// handle the process by the client.
        /// </summary>
        /// <param name="data"></param>
        /// <param name="client"></param>
        /// <returns></returns>
        byte[] Process(BinaryBuffer data, Client client);
    }
}

BinaryBuffer允许您非常轻松地读取发送到服务器的数据。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading.Tasks;
namespace cSharpServer
{
    public class BinaryBuffer
    {
        private const string Str0001 = "You are at the End of File!";
        private const string Str0002 = "You are Not Reading from the Buffer!";
        private const string Str0003 = "You are Currenlty Writing to the Buffer!";
        private const string Str0004 = "You are Currenlty Reading from the Buffer!";
        private const string Str0005 = "You are Not Writing to the Buffer!";
        private const string Str0006 = "You are trying to Reverse Seek, Unable to add a Negative value!";
        private bool _inRead;
        private bool _inWrite;
        private List<byte> _newBytes;
        private int _pointer;
        public byte[] ByteBuffer;
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public override string ToString()
        {
            return Helper.DefaultEncoding.GetString(ByteBuffer, 0, ByteBuffer.Length);
        }
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public BinaryBuffer(string data)
            : this(Helper.DefaultEncoding.GetBytes(data))
        {
        }
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public BinaryBuffer()
        {
        }
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public BinaryBuffer(byte[] data)
            : this(ref data)
        {
        }
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public BinaryBuffer(ref byte[] data)
        {
            ByteBuffer = data;
        }
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public void IncrementPointer(int add)
        {
            if (add < 0)
            {
                throw new Exception(Str0006);
            }
            _pointer += add;
            if (EofBuffer())
            {
                throw new Exception(Str0001);
            }
        }
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public int GetPointer()
        {
            return _pointer;
        }
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public static string GetString(ref byte[] buffer)
        {
            return Helper.DefaultEncoding.GetString(buffer, 0, buffer.Length);
        }
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public static string GetString(byte[] buffer)
        {
            return GetString(ref buffer);
        }
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public void BeginWrite()
        {
            if (_inRead)
            {
                throw new Exception(Str0004);
            }
            _inWrite = true;
            _newBytes = new List<byte>();
        }
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public void Write(float value)
        {
            if (!_inWrite)
            {
                throw new Exception(Str0005);
            }
            _newBytes.AddRange(BitConverter.GetBytes(value));
        }
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public void Write(byte value)
        {
            if (!_inWrite)
            {
                throw new Exception(Str0005);
            }
            _newBytes.Add(value);
        }
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public void Write(int value)
        {
            if (!_inWrite)
            {
                throw new Exception(Str0005);
            }
            _newBytes.AddRange(BitConverter.GetBytes(value));
        }
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public void Write(long value)
        {
            if (!_inWrite)
            {
                throw new Exception(Str0005);
            }
            byte[] byteArray = new byte[8];
            unsafe
            {
                fixed (byte* bytePointer = byteArray)
                {
                    *((long*)bytePointer) = value;
                }
            }
            _newBytes.AddRange(byteArray);
        }
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public int UncommitedLength()
        {
            return _newBytes == null ? 0 : _newBytes.Count;
        }
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public void WriteField(string value)
        {
            Write(value.Length);
            Write(value);
        }
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public void Write(string value)
        {
            if (!_inWrite)
            {
                throw new Exception(Str0005);
            }
            byte[] byteArray = Helper.DefaultEncoding.GetBytes(value);
            _newBytes.AddRange(byteArray);
        }
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public void Write(decimal value)
        {
            if (!_inWrite)
            {
                throw new Exception(Str0005);
            }
            int[] intArray = decimal.GetBits(value);
            Write(intArray[0]);
            Write(intArray[1]);
            Write(intArray[2]);
            Write(intArray[3]);
        }
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public void SetInt(int value, int pos)
        {
            byte[] byteInt = BitConverter.GetBytes(value);
            for (int i = 0; i < byteInt.Length; i++)
            {
                _newBytes[pos + i] = byteInt[i];
            }
        }
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public void SetLong(long value, int pos)
        {
            byte[] byteInt = BitConverter.GetBytes(value);
            for (int i = 0; i < byteInt.Length; i++)
            {
                _newBytes[pos + i] = byteInt[i];
            }
        }
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public void Write(byte[] value)
        {
            Write(ref value);
        }
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public void Write(ref byte[] value)
        {
            if (!_inWrite)
            {
                throw new Exception(Str0005);
            }
            _newBytes.AddRange(value);
        }
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public void EndWrite()
        {
            if (ByteBuffer != null)
            {
                _newBytes.InsertRange(0, ByteBuffer);
            }
            ByteBuffer = _newBytes.ToArray();
            _newBytes = null;
            _inWrite = false;
        }
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public void EndRead()
        {
            _inRead = false;
            _pointer = 0;
        }
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public void BeginRead()
        {
            if (_inWrite)
            {
                throw new Exception(Str0003);
            }
            _inRead = true;
            _pointer = 0;
        }
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public byte ReadByte()
        {
            if (!_inRead)
            {
                throw new Exception(Str0002);
            }
            if (EofBuffer())
            {
                throw new Exception(Str0001);
            }
            return ByteBuffer[_pointer++];
        }
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public int ReadInt()
        {
            if (!_inRead)
            {
                throw new Exception(Str0002);
            }
            if (EofBuffer(4))
            {
                throw new Exception(Str0001);
            }
            int startPointer = _pointer;
            _pointer += 4;
            return BitConverter.ToInt32(ByteBuffer, startPointer);
        }
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public float[] ReadFloatArray()
        {
            float[] dataFloats = new float[ReadInt()];
            for (int i = 0; i < dataFloats.Length; i++)
            {
                dataFloats[i] = ReadFloat();
            }
            return dataFloats;
        }
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public float ReadFloat()
        {
            if (!_inRead)
            {
                throw new Exception(Str0002);
            }
            if (EofBuffer(sizeof(float)))
            {
                throw new Exception(Str0001);
            }
            int startPointer = _pointer;
            _pointer += sizeof(float);
            return BitConverter.ToSingle(ByteBuffer, startPointer);
        }
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public decimal ReadDecimal()
        {
            if (!_inRead)
            {
                throw new Exception(Str0002);
            }
            if (EofBuffer(16))
            {
                throw new Exception(Str0001);
            }
            return new decimal(new[] { ReadInt(),
                ReadInt(),
                ReadInt(),
                ReadInt()
            });
        }
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public long ReadLong()
        {
            if (!_inRead)
            {
                throw new Exception(Str0002);
            }
            if (EofBuffer(8))
            {
                throw new Exception(Str0001);
            }
            int startPointer = _pointer;
            _pointer += 8;
            return BitConverter.ToInt64(ByteBuffer, startPointer);
        }
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public string ReadString(int size)
        {
            return Helper.DefaultEncoding.GetString(ReadByteArray(size), 0, size);
        }
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public byte[] ReadByteArray(int size)
        {
            if (!_inRead)
            {
                throw new Exception(Str0002);
            }
            if (EofBuffer(size))
            {
                throw new Exception(Str0001);
            }
            byte[] newBuffer = new byte[size];
            Array.Copy(ByteBuffer, _pointer, newBuffer, 0, size);
            _pointer += size;
            return newBuffer;
        }
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public bool EofBuffer(int over = 1)
        {
            return ByteBuffer == null || ((_pointer + over) > ByteBuffer.Length);
        }
    }
}

完整项目在 GitHub CSharpServer 上