使用异步操作的 TPL 数据流
本文关键字:TPL 数据流 异步操作 | 更新日期: 2023-09-27 18:36:52
我正在通过移植一些旧的套接字代码来尝试TPL数据流,以使用TPL数据流和新的异步功能。尽管 API 感觉坚如磐石,但我的代码最终仍然感觉混乱。我想知道我是否在这里错过了什么。
我的要求如下:套接字类公开:打开、关闭、发送和接收方法。所有返回一个任务,因此是异步的。打开和关闭是原子的。发送和接收可以彼此相邻工作,尽管两者都一次只能处理 1 个命令。
从逻辑上讲,这让我想到了内部控制的下一段代码:
// exposing an exclusive scheduler for connectivity related tasks and a parallel scheduler where send and receive can work with
private readonly ConcurrentExclusiveSchedulerPair exclusiveConnectionSchedulerPair;
private readonly ActionBlock<Action> connectionBlock;
private readonly ActionBlock<Action> sendBlock;
private readonly ActionBlock<Action> receiveBlock;
// within the constructor:
this.exclusiveConnectionSchedulerPair = new ConcurrentExclusiveSchedulerPair();
this.connectionBlock = new ActionBlock<Action>(action => action(), new ExecutionDataflowBlockOptions() { TaskScheduler = exclusiveConnectionSchedulerPair.ExclusiveScheduler });
this.sendBlock = new ActionBlock<Action>(action => action(), new ExecutionDataflowBlockOptions() { TaskScheduler = exclusiveConnectionSchedulerPair.ConcurrentScheduler });
this.receiveBlock = new ActionBlock<Action>(action => action(), new ExecutionDataflowBlockOptions() { TaskScheduler = exclusiveConnectionSchedulerPair.ConcurrentScheduler });
到目前为止一切都很好。我可以安全地将操作发送到发送和接收块,而不必担心在此期间运行与连接相关的操作。此外,ActionBlock 可确保要发送的多个调用是同步的(同上,用于接收、关闭和打开)。
问题在于,没有简单的方法让动作将任务传达回海报。现在我正在使用TaskCompletionSource来传达结果。喜欢:
public Task Send(ArraySegment<byte> buffer, CancellationToken cancellationToken)
{
TaskCompletionSource<object> resultCompletionSource = new TaskCompletionSource<object>();
sendBlock.Post(async () =>
{
if (!tcpClient.Connected)
throw new InvalidOperationException("Cant send when not open");
else
{
await sendStream.WriteAsync(buffer.Array, buffer.Offset, buffer.Count, cancellationToken);
resultCompletionSource.SetResult(null);
}
});
return resultCompletionSource.Task;
}
只是感觉丑陋和笨拙。我的问题是:有没有办法使用 TPL 同步工作流,而不必在两者之间进行通信?
谢谢!
首先,您根本不需要 TPL 数据流,因为您实际上没有任何数据流。
其次,像这样使用TaskScheduler
也不是正确的解决方案。 TaskScheduler
计划代码,但是当您await
某些内容时,没有代码运行。因此,当WriteAsync()
执行异步工作时,Open()
的代码可以运行。
您实际需要的是类似 ReaderWriterLock
,但这适用于 async
.框架中没有类似的东西,但你可以使用 Stephen Toub 的文章 Building Async Coordination Primitives, Part 7: AsyncReaderWriterLock,它完全符合你的需要。本文还更详细地解释了为什么使用TaskScheduler
是错误的。
使用 AsyncReaderWriterLock
,您的代码可能如下所示:
public async Task Send(ArraySegment<byte> buffer, CancellationToken cancellationToken)
{
using (await readerWriterLock.ReaderLockAsync())
{
if (!tcpClient.Connected)
throw new InvalidOperationException("Can't send when not open");
await sendStream.WriteAsync(buffer.Array, buffer.Offset, buffer.Count, cancellationToken);
}
}