异步NetworkStream连续读取设计模式
本文关键字:设计模式 读取 连续 NetworkStream 异步 | 更新日期: 2023-09-27 18:13:22
我正在尝试将一个使用多个线程的类转换为使用重叠I/O。它几乎可以工作,但似乎随机遇到了线程问题,我不知道为什么。
太多的代码无法直接发布,但以下是基本模式。目标是坐在那里从连接中读取数据,直到连接被处理,所以当每个EndRead()
完成时,它应该启动一个新的BeginRead()
。
public enum State
{
Idle,
BeforeRead,
PendingRead,
FinishingRead,
Died,
}
private int state;
private IAsyncResult asyncResult;
private byte[] readBuffer = new byte[4096];
private System.Net.Sockets.NetworkStream stream;
public void Connect(System.Net.Sockets.TcpClient client, string host, int port)
{
client.Connect(host, port);
this.stream = client.GetStream();
}
private bool SetState(State expectedState, State newState)
{
return Interlocked.CompareExchange(ref this.state, (int)newState, (int)expectedState) == expectedState;
}
public void BeginRead()
{
try
{
while (true)
{
if (!SetState(State.Idle, State.BeforeRead))
return;
IAsyncResult async;
async = stream.BeginRead(readBuffer, 0, readBuffer.Length, x => EndRead(true), null);
if (async == null)
return;
SetState(State.BeforeRead, State.PendingRead);
lock (this)
this.asyncResult = async;
if (async.AsyncWaitHandle.WaitOne(0))
EndRead(false);
}
}
catch { this.state = State.Died; }
}
private void EndRead(bool asynchronousCallback)
{
try
{
if (!SetState(State.PendingRead, State.FinishingRead))
return;
IAsyncResult async;
lock (this)
{
async = this.asyncResult;
this.asyncResult = null;
}
if (async == null)
return;
int bytesRead = stream.EndRead(async);
HandleData(bytesRead, readBuffer);
SetState(State.FinishingRead, State.Idle);
if (asynchronousCallback)
BeginRead();
}
catch { this.state = State.Died; }
}
大多数时候它是有效的,但偶尔它会做以下几件事之一:
- 停止接收消息
- 抛出一个异常I为
asyncResult has already been handled: "EndReceive can only be called once for each asynchronous operation"
我还应该提到的是,另一个线程正在进行同步写入(stream.Write,而不是stream.BeginWrite
(。我认为阅读和写作应该相互独立,所以不应该影响行为。
我的设计有根本缺陷吗?这是一个精简的例子,所以我精简的东西可能会导致问题,但我需要知道我的基本设计是否有效。异步链式读取的正确方法是什么?
(如果建议使用async/await
,则此代码需要在Windows XP上运行,因此这不是一个选项。(
您有一个竞赛条件:
IAsyncResult async;
async = stream.BeginRead(readBuffer, 0, readBuffer.Length, x => EndRead(true), null);
/* Race condition here */
if (async == null)
return;
SetState(State.BeforeRead, State.PendingRead);
lock (this)
this.asyncResult = async;
您的EndRead
可以在SetState
和/或this.asyncResult = async
执行之前执行。你不能这样做。必须在发布BeginRead
之前设置状态,并在出现故障时重置。不要保留和使用成员asyncResult
,而是将回调传递给BeginRead
,并在回调中获得异步结果:
SetState(State.BeforeRead, State.PendingRead);
stream.BeginRead(readBuffer, 0, readBuffer.Length, EndRead);
...
private void EndRead(IAsyncResult asyncResult) {
int bytesRead = stream.EndRead(asyncResult);
...
}