异步NetworkStream连续读取设计模式

本文关键字:设计模式 读取 连续 NetworkStream 异步 | 更新日期: 2023-09-27 18:13:22

我正在尝试将一个使用多个线程的类转换为使用重叠I/O。它几乎可以工作,但似乎随机遇到了线程问题,我不知道为什么。

太多的代码无法直接发布,但以下是基本模式。目标是坐在那里从连接中读取数据,直到连接被处理,所以当每个EndRead()完成时,它应该启动一个新的BeginRead()

public enum State
{
    Idle,
    BeforeRead,
    PendingRead,
    FinishingRead,
    Died,
}
private int state;
private IAsyncResult asyncResult;
private byte[] readBuffer = new byte[4096];
private System.Net.Sockets.NetworkStream stream;
public void Connect(System.Net.Sockets.TcpClient client, string host, int port)
{
    client.Connect(host, port);
    this.stream = client.GetStream();
}
private bool SetState(State expectedState, State newState)
{
    return Interlocked.CompareExchange(ref this.state, (int)newState, (int)expectedState) == expectedState;
}
public void BeginRead()
{
    try
    {
        while (true)
        {
            if (!SetState(State.Idle, State.BeforeRead))
                return;
            IAsyncResult async;
            async = stream.BeginRead(readBuffer, 0, readBuffer.Length, x => EndRead(true), null);
            if (async == null)
                return;
            SetState(State.BeforeRead, State.PendingRead);
            lock (this)
                this.asyncResult = async;
            if (async.AsyncWaitHandle.WaitOne(0))
                EndRead(false);
        }
    }
    catch { this.state = State.Died; }
}
private void EndRead(bool asynchronousCallback)
{
    try
    {
        if (!SetState(State.PendingRead, State.FinishingRead))
            return;
        IAsyncResult async;
        lock (this)
        {
            async = this.asyncResult;
            this.asyncResult = null;
        }
        if (async == null)
            return;
        int bytesRead = stream.EndRead(async);
        HandleData(bytesRead, readBuffer);
        SetState(State.FinishingRead, State.Idle);
        if (asynchronousCallback)
            BeginRead();
    }
    catch { this.state = State.Died; }
}

大多数时候它是有效的,但偶尔它会做以下几件事之一:

  • 停止接收消息
  • 抛出一个异常I为asyncResult has already been handled: "EndReceive can only be called once for each asynchronous operation"

我还应该提到的是,另一个线程正在进行同步写入(stream.Write,而不是stream.BeginWrite(。我认为阅读和写作应该相互独立,所以不应该影响行为。

我的设计有根本缺陷吗?这是一个精简的例子,所以我精简的东西可能会导致问题,但我需要知道我的基本设计是否有效。异步链式读取的正确方法是什么?

(如果建议使用async/await,则此代码需要在Windows XP上运行,因此这不是一个选项。(

异步NetworkStream连续读取设计模式

您有一个竞赛条件:

        IAsyncResult async;
        async = stream.BeginRead(readBuffer, 0, readBuffer.Length, x => EndRead(true), null);
        /* Race condition here */   
        if (async == null)
            return;
        SetState(State.BeforeRead, State.PendingRead);
        lock (this)
            this.asyncResult = async;

您的EndRead可以在SetState和/或this.asyncResult = async执行之前执行。你不能这样做。必须在发布BeginRead之前设置状态,并在出现故障时重置。不要保留和使用成员asyncResult,而是将回调传递给BeginRead,并在回调中获得异步结果:

  SetState(State.BeforeRead, State.PendingRead);
  stream.BeginRead(readBuffer, 0, readBuffer.Length, EndRead);
 ...
  private void EndRead(IAsyncResult asyncResult) {
     int bytesRead = stream.EndRead(asyncResult);
     ...
  }