将 Socket.ReceiveAsync 替换为 NetworkStream.ReadAsync(可等待)

本文关键字:等待 ReadAsync NetworkStream Socket ReceiveAsync 替换 | 更新日期: 2023-09-27 18:27:55

我有一个应用程序,它同时建立几百个TCP连接,并从它们接收源源不断的数据流。

 private void startReceive()
    {
        SocketAsyncEventArgs e = new SocketAsyncEventArgs();
        e.Completed += receiveCompleted;
        e.SetBuffer(new byte[1024], 0, 1024);
        if (!Socket.ReceiveAsync(e)) { receiveCompleted(this, e); }  
    }
    void receiveCompleted(object sender, SocketAsyncEventArgs e)
    {
        ProcessData(e);
        if (!Socket.ReceiveAsync(e)) { receiveCompleted(this, e); }
    }

我的尝试导致了这样的事情:

private async void StartReceive()
    {
        byte[] Buff = new byte[1024];
        int recv = 0;
        while (Socket.Connected)
        {
            recv = await NetworkStream.ReadAsync(Buff, 0, 1024);
            ProcessData(Buff,recv);
        }
    }

我遇到的问题是调用StartReceive()的方法会阻塞,并且无论如何都不会到达随附的 StartReceive(( . Creating a new task for StartReceive(( would just end up with 300-ish threads, and it seems to do so just by calling StartReceive((' 的随附StartSend() method called after

在使用NetworkStream时,在现有代码上实现新asyncawait关键字的正确方法是什么,因此它使用 Socket.SendAsync()Socket.ReceiveAsync() 正在使用的线程池来避免拥有数百个线程/任务?

以这种方式使用 networkstream 是否比具有 beginreceive 的 I/O 完成端口具有任何性能优势?

将 Socket.ReceiveAsync 替换为 NetworkStream.ReadAsync(可等待)

你在这里同时更改了两件事:异步样式(SocketAsyncEventArgsTask/async(和抽象级别(SocketNetworkStream(。

由于您已经熟悉Socket,我建议只更改异步样式,并继续直接使用 Socket 类。

异步 CTP 没有Socket提供任何async兼容的方法(这很奇怪;我认为它们被错误地遗漏了,并将添加到 .NET 4.5 中(。

如果您

使用我的 AsyncEx 库,创建自己的 ReceiveAsyncTask 扩展方法(以及其他操作的类似包装器(并不难:

public static Task<int> ReceiveAsyncTask(this Socket socket,
    byte[] buffer, int offset, int size)
{
  return AsyncFactory<int>.FromApm(socket.BeginReceive, socket.EndReceive,
      buffer, offset, size, SocketFlags.None);
}

一旦你这样做了,你的StartReceive可以这样写:

private async Task StartReceive()
{
  try
  {
    var buffer = new byte[1024];
    while (true)
    {
      var bytesReceived = await socket.ReceiveAsyncTask(buffer, 0, 1024)
          .ConfigureAwait(false);
      ProcessData(buffer, bytesReceived);
    }
  }
  catch (Exception ex)
  {
    // Handle errors here
  }
}

现在,解决许多小问题:

  • await不会生成新线程。我在我的博客上写了一个异步/等待介绍,就像许多其他人一样。 async/await允许并发,但这并不一定意味着多线程。
  • 数百个线程可能会出现问题。但是,数百个任务根本不是问题;线程池和 BCL 旨在处理许多任务。
  • async/await不是一种全新的异步处理形式;它只是表达异步处理的一种更简单的方法。它仍然在下面使用 IOCP。 async/await的性能略低于较低级别的方法;它的吸引力在于易于编写和编写异步方法。
  • 一个非常繁忙的系统在切换到 async/await 时可以看到一些增加的 GC 压力。并行团队的 Stephen Toub 写了一些特定于套接字的示例,可以帮助解决这个问题。(我建议先使用简单的模式,并且仅在您认为必要时使用性能增强的方法;不过,如果您最终需要它,知道它就在那里是件好事(。
  • 异步方法应返回Task,除非您确实需要它们返回voidTask是可等待的,因此您的方法是可组合的(并且更容易测试(; void更像是"一劳永逸"。
  • 可以调用 ConfigureAwait(false) 来告知其余的 async 方法在线程池线程上执行。我在上面的示例中使用它,以便ProcessData在线程池线程中执行,就像使用 SocketAsyncEventArgs 时一样。
  • Socket.Connected是没有用的。您需要发送数据以检测连接是否仍然有效。