在c#中实现async/await模式的回调机制

本文关键字:模式 回调 机制 await 实现 async | 更新日期: 2023-09-27 18:04:16

如何正确地将以下回调驱动代码转换为async/await模式:

public class DeviceWrapper
{
 // external device which provides real time stream of data
 private InternalDevice device = new InternalDevice();
 private List<int> accumulationBuffer = new List<int>();
 public void StartReceiving()
 {
     // the following callback invocations might by synchronized by main
     // UI message pump, particular window message pump
     // or some other way
     device.Synchronization = Synchronization.UI;
     device.DataAvailable += DataAvailableHandler;
     device.ReceivingStoppedOrErrorOccured += StopHandler;
     device.Start();
 }
 private void DataAvailableHandler(object sender, DataEventArgs e)
 {
     // Filter data from e.Data and accumulate to accumulationBuffer field.
     // If certail condition is met, signal pending task (if there is any)
     //as complete return to the awaiting caller accumulationBuffer or perhaps temporary buffer created from accumulationBuffer
     // in order to make it available to the caller.
     // Handle also requested cancellation.
 }
 public Task<byte[]> GetData(CancellationToken token)
 {
     // create task returning data filtered and accumulated in DataAvailableHandler 
 }
}
// usage:
async void Test()
{
 DeviceWrapper w = new DeviceWrapper();
 w.StartReceiving();
 while(true)
 {
  byte[] filteredData = await w.GetData(CancellationToken.Null);
  Use(filteredData);
 }
}

我通过阅读。net StreamReader类源来寻求解决这个问题的灵感,但它使我更加困惑。

感谢专家的建议!

在c#中实现async/await模式的回调机制

您要找的是TaskCompletionSource<byte[]>。这是它看起来的近似值:

public Task<byte[]> GetData(CancellationToken token)
{
      cancellationToken.ThrowIfCancellationRequested;
      var tcs = new TaskCompletionSource<byte[]>();
      DataEventHandler dataHandler = null;
      dataHandler = (o, e) => 
      {
          device.DataAvailable -= dataHandler;
          tcs.SetResult(e.Data);
      }
      StopEventHandler stopHandler = null;
      stopHandler = (os, se) =>
      {
            device.ReceivingStoppedOrErrorOccured -= stopHandler;
            // Assuming stop handler has some sort of error property.
            tcs.SetException(se.Exception);
      }
      device.DataAvailable += dataHandler;
      device.ReceivingStoppedOrErrorOccured += stopHandler;
      device.Start();
      return tcs.Task;
}

如果你正确地使用async await,你的代码会容易得多:

首先:

  • 如果你想调用异步函数,你应该自己异步
  • 每个异步函数返回Task而不是void或Task <TResult>而不是result
  • 有一个例外:异步事件处理器可能返回void
  • 在调用异步函数之后,你可以做其他事情,直到你需要答案。但是你不需要做其他事情。
  • 一旦你需要等待Task的答案,结果就是result。

现在实现您的示例。有几种方法可以解决这个问题,但我认为这通常是一种生产者-消费者模式:我们有一个对象以独立于另一个对象的速度生产东西。

你可以自己创建,使用信号量来通知新数据,但是。net已经有了这样的东西:

System.Threading.Tasks.DataFlow.BufferBlock .

你需要下载一个microsoft nuget包。参见BufferBlock的MSDN描述中的备注。

BufferBlock是你发送类型为T的对象,而另一个任务等待类型为T的对象到达的对象。完全支持async/await。

发送方

:

  • bufferblock实现ITargetBlock <T>,其中T是它发送的类型。
  • 可以将T类型的项发送到任意ITargetBlock
  • 考虑使用ITargetBlock <T>作为属性使发送者成为一个单独的对象。
  • 当它有数据分发:Post它,或SendAsync如果你想使用async/await。后面看到

消费者端:

  • BufferBlock <T>实现了ISourceBlock <T>
  • 消费者获取发送者发送对象的ISourceBlock,在本例中是发送者使用的BufferBlock。
  • 启动时,使用Receive或ReceiveAsync等待数据到达。

让我们把它们放在一起:

public class DeviceWrapper
{
    // external device which provides real time stream of data
    private InternalDevice device = new InternalDevice();
    // internal buffer replaced by the bufferBlock
    BufferBlock<byte> bufferBlock = new BufferBlock<byte>()
    public void StartReceiving() {...}
    private async void DataAvailableHandler(object sender, DataEventArgs e)
    {
        // get the input and convert it to a byte
        // post the byte to the buffer block asynchronously
        byte byteToSend = ...
        await this.bufferBlock.SendAsync(byteToSend);
    }
    public async Task<IEnumerable<byte>> GetData(CancellationToken token)
    {
        List<byte> receivedBytes = new List<byte>()
        while (await this.BufferBlock.OutputAvailableAsync(token))
        {   // a byte is available
            byte b = await this.bufferBlock.ReceiveAsync(token);
            receivedBytes.Add(b);
            if (receivedBytes.Count > ...)
            {
                return receivedBytes;
            }
            // else: not enough bytes received yet, wait for more
        }
    }
}   
async Task Test(CancellationToken token)
{
    DeviceWrapper w = new DeviceWrapper();
    w.StartReceiving();
    while(NoStopRequested)
    {
        token.ThrowIfCancellationrequested();
        var filteredData = await w.GetData(token);
        Use(filteredData);
    }
}

关于bufferblock还有很多要讲的,特别是在没有数据可用的情况下如何巧妙地停止它们。MSDN有几个关于这方面的例子。参见并行库中的DataFlow章节

https://msdn.microsoft.com/en-us/library/hh228603 (v = vs.110) . aspx