使用 async/await 和 TPL 数据流的收益返回

本文关键字:数据流 收益 返回 TPL async await 使用 | 更新日期: 2023-09-27 18:34:21

我正在尝试使用 TPL Dataflow 实现数据处理管道。但是,我对数据流相对较新,并不完全确定如何正确使用它来解决我试图解决的问题。

问题

我正在尝试遍历文件列表并处理每个文件以读取一些数据,然后进一步处理该数据。每个文件的大小大致为 700MB1GB。每个文件都包含JSON数据。为了并行处理这些文件而不是运行内存,我正在尝试将IEnumerable<>yield return一起使用,然后进一步处理数据。

获得文件列表后,我想一次最多并行处理 4-5 个文件。我的困惑来自:

  • 如何将IEnumerable<>yeild returnasync/await和数据流配合使用。svick 遇到了这个答案,但仍然不确定如何将IEnumerable<>转换为ISourceBlock然后将所有块链接在一起并跟踪完成情况。
  • 就我而言,producer会非常快(浏览文件列表(,但consumer会非常慢(处理每个文件 - 读取数据,反序列化JSON(。在本例中,如何跟踪完成情况。
  • 我应该使用数据块LinkTo功能来连接各种块?还是使用OutputAvailableAsync()ReceiveAsync()等方法将数据从一个块传播到另一个块。

代码

private const int ProcessingSize= 4;
private BufferBlock<string> _fileBufferBlock;
private ActionBlock<string> _processingBlock;
private BufferBlock<DataType> _messageBufferBlock;
public Task ProduceAsync()
{
    PrepareDataflow(token);
    var bufferTask = ListFilesAsync(_fileBufferBlock, token);
    var tasks = new List<Task> { bufferTask, _processingBlock.Completion };
    return Task.WhenAll(tasks);
}
private async Task ListFilesAsync(ITargetBlock<string> targetBlock, CancellationToken token)
{
    ...
    // Get list of file Uris
    ...
    foreach(var fileNameUri in fileNameUris)
        await targetBlock.SendAsync(fileNameUri, token);
    targetBlock.Complete();
}
private async Task ProcessFileAsync(string fileNameUri, CancellationToken token)
{
    var httpClient = new HttpClient();
    try
    {
        using (var stream = await httpClient.GetStreamAsync(fileNameUri))
        using (var sr = new StreamReader(stream))
        using (var jsonTextReader = new JsonTextReader(sr))
        {
            while (jsonTextReader.Read())
            {
                if (jsonTextReader.TokenType == JsonToken.StartObject)
                {
                    try
                    {
                        var data = _jsonSerializer.Deserialize<DataType>(jsonTextReader)
                        await _messageBufferBlock.SendAsync(data, token);
                    }
                    catch (Exception ex)
                    {
                        _logger.Error(ex, $"JSON deserialization failed - {fileNameUri}");
                    }
                }
            }
        }
    }
    catch(Exception ex)
    {
        // Should throw?
        // Or if converted to block then report using Fault() method?
    }
    finally
    {
        httpClient.Dispose();
        buffer.Complete();
    }
}
private void PrepareDataflow(CancellationToken token)
{
    _fileBufferBlock = new BufferBlock<string>(new DataflowBlockOptions
    {
        CancellationToken = token
    });
    var actionExecuteOptions = new ExecutionDataflowBlockOptions
    {
        CancellationToken = token,
        BoundedCapacity = ProcessingSize,
        MaxMessagesPerTask = 1,
        MaxDegreeOfParallelism = ProcessingSize
    };
    _processingBlock = new ActionBlock<string>(async fileName =>
    {
        try
        {
            await ProcessFileAsync(fileName, token);
        }
        catch (Exception ex)
        {
            _logger.Fatal(ex, $"Failed to process fiel: {fileName}, Error: {ex.Message}");
            // Should fault the block?
        }
    }, actionExecuteOptions);
    _fileBufferBlock.LinkTo(_processingBlock, new DataflowLinkOptions { PropagateCompletion = true });
    _messageBufferBlock = new BufferBlock<DataType>(new ExecutionDataflowBlockOptions
    {
        CancellationToken = token,
        BoundedCapacity = 50000
    });
    _messageBufferBlock.LinkTo(DataflowBlock.NullTarget<DataType>());
}

在上面的代码中,我没有使用IEnumerable<DataType>yield return,因为我不能将其与async/await一起使用。所以我将输入缓冲区链接到ActionBlock<DataType>而又发布到另一个队列。但是通过使用ActionBlock<>,我无法将其链接到下一个块进行处理,而必须手动从ActionBlock<> Post/SendAsyncBufferBlock<>。另外,在这种情况下,不确定如何跟踪完成情况。

这段代码有效,但是,我相信还有比这更好的解决方案,我可以链接所有块(而不是ActionBlock<DataType>然后从中发送消息到BufferBlock<DataType>(

另一种选择可能是使用 RxIEnumerable<>转换为IObservable<>,但我对Rx不太熟悉,也不确切知道如何混合TPL DataflowRx

使用 async/await 和 TPL 数据流的收益返回

问题 1

通过在使用者块上使用PostSendAsync,将IEnumerable<T>创建器插入 TPL 数据流链,如下所示:

foreach (string fileNameUri in fileNameUris)
{
    await _processingBlock.SendAsync(fileNameUri).ConfigureAwait(false);
}

您也可以使用 BufferBlock<TInput> ,但在您的情况下,它实际上似乎相当不必要(甚至是有害的 - 请参阅下一部分(。

问题2

你什么时候更喜欢SendAsync而不是Post?如果您的生产者运行速度快于 URI 的处理速度(并且您已指出是这种情况(,并且您选择为您的_processingBlock提供BoundedCapacity,那么当块的内部缓冲区达到指定容量时,您的SendAsync将"挂起",直到缓冲区插槽释放,并且您的foreach循环将受到限制。这种反馈机制会产生背压,并确保您不会耗尽内存。

问题3

在大多数情况下,您绝对应该使用 LinkTo 方法来链接您的块。不幸的是,由于IDisposable和非常大(潜在(序列的相互作用,您的情况是一个极端情况。因此,您的完成将在缓冲区和处理块之间自动流动(由于LinkTo(,但在那之后 - 您需要手动传播它。这很棘手,但可行。

我将用一个"Hello World"示例来说明这一点,其中生产者迭代每个字符,使用者(这真的很慢(将每个字符输出到调试窗口。

注意:LinkTo不存在。

// REALLY slow consumer.
var consumer = new ActionBlock<char>(async c =>
{
    await Task.Delay(100);
    Debug.Print(c.ToString());
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });
var producer = new ActionBlock<string>(async s =>
{
    foreach (char c in s)
    {
        await consumer.SendAsync(c);
        Debug.Print($"Yielded {c}");
    }
});
try
{
    producer.Post("Hello world");
    producer.Complete();
    await producer.Completion;
}
finally
{
    consumer.Complete();
}
// Observe combined producer and consumer completion/exceptions/cancellation.
await Task.WhenAll(producer.Completion, consumer.Completion);

这输出:

产出 HH产生 ee产量升l产量升l产量 oo产生 屈服 ww产量 oo产生 rr产量升l产量 dd

从上面的输出中可以看出,生产者受到限制,块之间的切换缓冲区永远不会变得太大。

编辑

您可能会发现通过以下方式传播完成更干净

producer.Completion.ContinueWith(
    _ => consumer.Complete(), TaskContinuationOptions.ExecuteSynchronously
);

。紧接着producer定义。这使您可以稍微减少生产者/消费者耦合 - 但最后您仍然必须记住观察Task.WhenAll(producer.Completion, consumer.Completion) .

为了并行处理这些文件而不是运行内存,我正在尝试使用 IEnumerable<> 具有收益回报,然后进一步处理数据。

我不认为这一步是必要的。您实际上在这里避免的只是一个文件名列表。即使您有数百万个文件,文件名列表也不会占用大量内存。

我正在将输入缓冲区链接到操作块,而操作块又会发布到另一个队列。但是,通过使用ActionBlock<>,我无法将其链接到下一个块进行处理,并且必须手动将Async从ActionBlock<>发布/发送到BufferBlock<>。另外,在这种情况下,不确定如何跟踪完成情况。

ActionBlock<TInput>是一个"行尾"块。它只接受输入,不产生任何输出。在您的情况下,您不希望ActionBlock<TInput>;您需要 TransformManyBlock<TInput, TOutput> ,它接受输入,对其运行函数,并生成输出(每个输入项具有任意数量的输出项(。

要记住的另一点是,所有缓冲块都有一个输入缓冲器。因此,额外的BufferBlock是不必要的。

最后,如果您已经在"数据流领域",通常最好以实际执行某些操作的数据流块结尾(例如,ActionBlock而不是BufferBlock(。在这种情况下,可以将BufferBlock用作有界的生产者/使用者队列,其中其他一些代码正在使用结果。就个人而言,我认为将消费代码重写为ActionBlock的操作可能更干净,但让使用者独立于数据流也可能更干净。对于下面的代码,我保留了最终的有界BufferBlock,但是如果您使用此解决方案,请考虑将最终块更改为有界ActionBlock

private const int ProcessingSize= 4;
private static readonly HttpClient HttpClient = new HttpClient();
private TransformBlock<string, DataType> _processingBlock;
private BufferBlock<DataType> _messageBufferBlock;
public Task ProduceAsync()
{
  PrepareDataflow(token);
  ListFiles(_fileBufferBlock, token);
  _processingBlock.Complete();
  return _processingBlock.Completion;
}
private void ListFiles(ITargetBlock<string> targetBlock, CancellationToken token)
{
  ... // Get list of file Uris, occasionally calling token.ThrowIfCancellationRequested()
  foreach(var fileNameUri in fileNameUris)
    _processingBlock.Post(fileNameUri);
}
private async Task<IEnumerable<DataType>> ProcessFileAsync(string fileNameUri, CancellationToken token)
{
  return Process(await HttpClient.GetStreamAsync(fileNameUri), token);
}
private IEnumerable<DataType> Process(Stream stream, CancellationToken token)
{
  using (stream)
  using (var sr = new StreamReader(stream))
  using (var jsonTextReader = new JsonTextReader(sr))
  {
    while (jsonTextReader.Read())
    {
      token.ThrowIfCancellationRequested();
      if (jsonTextReader.TokenType == JsonToken.StartObject)
      {
        try
        {
          yield _jsonSerializer.Deserialize<DataType>(jsonTextReader);
        }
        catch (Exception ex)
        {
          _logger.Error(ex, $"JSON deserialization failed - {fileNameUri}");
        }
      }
    }
  }
}
private void PrepareDataflow(CancellationToken token)
{
  var executeOptions = new ExecutionDataflowBlockOptions
  {
    CancellationToken = token,
    MaxDegreeOfParallelism = ProcessingSize
  };
  _processingBlock = new TransformManyBlock<string, DataType>(fileName =>
      ProcessFileAsync(fileName, token), executeOptions);
  _messageBufferBlock = new BufferBlock<DataType>(new DataflowBlockOptions
  {
    CancellationToken = token,
    BoundedCapacity = 50000
  });
}

学习 Rx 可能非常困难,特别是对于混合异步并行数据流情况,您在此处拥有。

至于你的其他问题:

如何使用 IEnumerable<> 和 yeild return with async/await 和 dataflow。

asyncyield根本不兼容。至少在今天的语言中是这样。在您的情况下,JSON 读取器无论如何都必须同步读取流(它们不支持异步读取(,因此实际的流处理是同步的,可以与 yield 一起使用。执行初始来回以获取流本身仍然可以是异步的,并且可以与async一起使用。这和我们今天所能得到的一样好,直到 JSON 读取器支持异步读取并且语言支持 async yield .(Rx 今天可以做"异步收益",但 JSON 读取器仍然不支持异步读取,因此在这种特定情况下它无济于事(。

在本例中,如何跟踪完成情况。

如果 JSON 读取器确实支持异步读取,那么上面的解决方案将不是最好的解决方案。在这种情况下,您可能希望使用手动SendAsync调用,并且只需要链接这些块的完成,这可以按以下方式完成:

_processingBlock.Completion.ContinueWith(
    task =>
    {
      if (task.IsFaulted)
        ((IDataflowBlock)_messageBufferBlock).Fault(task.Exception);
      else if (!task.IsCanceled)
        _messageBufferBlock.Complete();
    },
    CancellationToken.None,
    TaskContinuationOptions.DenyChildAttach | TaskContinuationOptions.ExecuteSynchronously,
    TaskScheduler.Default);

我应该使用数据块的 LinkTo 功能来连接各种块吗?或者使用 OutputAvailableAsync(( 和 ReceiveAsync(( 等方法将数据从一个块传播到另一个块。

尽可能使用LinkTo。它为您处理所有极端情况。

应该扔吗? 应该责怪块吗?

这完全取决于你。默认情况下,当任何项目的任何处理失败时,块就会出错,如果你正在传播完成,整个块链都会出错。

错误块相当严重;它们会丢弃任何正在进行的工作并拒绝继续处理。如果要重试,则必须构建新的数据流网格。

如果您更喜欢"更软"的错误策略,则可以catch异常并执行类似记录它们的操作(代码当前执行(,也可以更改数据流块的性质以将异常作为数据项传递。

值得一看的是 Rx。 除非我缺少某些内容,否则您需要的整个代码(除了现有的 ProcessFileAsync 方法(将如下所示:

var query =
    fileNameUris
        .Select(fileNameUri =>
            Observable
                .FromAsync(ct => ProcessFileAsync(fileNameUri, ct)))
        .Merge(maxConcurrent : 4);
var subscription =
    query
        .Subscribe(
            u => { },
            () => { Console.WriteLine("Done."); });

做。它是异步运行的。可以通过拨打subscription.Dispose();来取消。您可以指定最大并行度。