在c#中实现async/await模式的回调机制
本文关键字:模式 回调 机制 await 实现 async | 更新日期: 2023-09-27 18:04:16
如何正确地将以下回调驱动代码转换为async/await模式:
public class DeviceWrapper
{
// external device which provides real time stream of data
private InternalDevice device = new InternalDevice();
private List<int> accumulationBuffer = new List<int>();
public void StartReceiving()
{
// the following callback invocations might by synchronized by main
// UI message pump, particular window message pump
// or some other way
device.Synchronization = Synchronization.UI;
device.DataAvailable += DataAvailableHandler;
device.ReceivingStoppedOrErrorOccured += StopHandler;
device.Start();
}
private void DataAvailableHandler(object sender, DataEventArgs e)
{
// Filter data from e.Data and accumulate to accumulationBuffer field.
// If certail condition is met, signal pending task (if there is any)
//as complete return to the awaiting caller accumulationBuffer or perhaps temporary buffer created from accumulationBuffer
// in order to make it available to the caller.
// Handle also requested cancellation.
}
public Task<byte[]> GetData(CancellationToken token)
{
// create task returning data filtered and accumulated in DataAvailableHandler
}
}
// usage:
async void Test()
{
DeviceWrapper w = new DeviceWrapper();
w.StartReceiving();
while(true)
{
byte[] filteredData = await w.GetData(CancellationToken.Null);
Use(filteredData);
}
}
我通过阅读。net StreamReader类源来寻求解决这个问题的灵感,但它使我更加困惑。
感谢专家的建议!
您要找的是TaskCompletionSource<byte[]>
。这是它看起来的近似值:
public Task<byte[]> GetData(CancellationToken token)
{
cancellationToken.ThrowIfCancellationRequested;
var tcs = new TaskCompletionSource<byte[]>();
DataEventHandler dataHandler = null;
dataHandler = (o, e) =>
{
device.DataAvailable -= dataHandler;
tcs.SetResult(e.Data);
}
StopEventHandler stopHandler = null;
stopHandler = (os, se) =>
{
device.ReceivingStoppedOrErrorOccured -= stopHandler;
// Assuming stop handler has some sort of error property.
tcs.SetException(se.Exception);
}
device.DataAvailable += dataHandler;
device.ReceivingStoppedOrErrorOccured += stopHandler;
device.Start();
return tcs.Task;
}
如果你正确地使用async await,你的代码会容易得多:
首先:
- 如果你想调用异步函数,你应该自己异步
- 每个异步函数返回Task而不是void或Task
<TResult
>而不是result - 有一个例外:异步事件处理器可能返回void 在调用异步函数之后,你可以做其他事情,直到你需要答案。但是你不需要做其他事情。
- 一旦你需要等待Task的答案,结果就是result。
现在实现您的示例。有几种方法可以解决这个问题,但我认为这通常是一种生产者-消费者模式:我们有一个对象以独立于另一个对象的速度生产东西。
你可以自己创建,使用信号量来通知新数据,但是。net已经有了这样的东西:
System.Threading.Tasks.DataFlow.BufferBlock .
你需要下载一个microsoft nuget包。参见BufferBlock的MSDN描述中的备注。
BufferBlock是你发送类型为T的对象,而另一个任务等待类型为T的对象到达的对象。完全支持async/await。
发送方:
- bufferblock实现ITargetBlock
<T
>,其中T是它发送的类型。 - 可以将T类型的项发送到任意ITargetBlock
- 考虑使用ITargetBlock
<T
>作为属性使发送者成为一个单独的对象。 - 当它有数据分发:Post它,或SendAsync如果你想使用async/await。后面看到
消费者端:
- BufferBlock
<T
>实现了ISourceBlock<T
> - 消费者获取发送者发送对象的ISourceBlock,在本例中是发送者使用的BufferBlock。
- 启动时,使用Receive或ReceiveAsync等待数据到达。
让我们把它们放在一起:
public class DeviceWrapper
{
// external device which provides real time stream of data
private InternalDevice device = new InternalDevice();
// internal buffer replaced by the bufferBlock
BufferBlock<byte> bufferBlock = new BufferBlock<byte>()
public void StartReceiving() {...}
private async void DataAvailableHandler(object sender, DataEventArgs e)
{
// get the input and convert it to a byte
// post the byte to the buffer block asynchronously
byte byteToSend = ...
await this.bufferBlock.SendAsync(byteToSend);
}
public async Task<IEnumerable<byte>> GetData(CancellationToken token)
{
List<byte> receivedBytes = new List<byte>()
while (await this.BufferBlock.OutputAvailableAsync(token))
{ // a byte is available
byte b = await this.bufferBlock.ReceiveAsync(token);
receivedBytes.Add(b);
if (receivedBytes.Count > ...)
{
return receivedBytes;
}
// else: not enough bytes received yet, wait for more
}
}
}
async Task Test(CancellationToken token)
{
DeviceWrapper w = new DeviceWrapper();
w.StartReceiving();
while(NoStopRequested)
{
token.ThrowIfCancellationrequested();
var filteredData = await w.GetData(token);
Use(filteredData);
}
}
关于bufferblock还有很多要讲的,特别是在没有数据可用的情况下如何巧妙地停止它们。MSDN有几个关于这方面的例子。参见并行库中的DataFlow章节
https://msdn.microsoft.com/en-us/library/hh228603 (v = vs.110) . aspx