如何使用 yield 返回并行块或任务中的 Item 集合

本文关键字:任务 Item 集合 何使用 yield 返回 并行 | 更新日期: 2023-09-27 18:35:30

我正在寻找有关如何利用 yield 关键字在并行块或任务块中返回 IEnumberable 的帮助。下面是伪代码

public IEnumerable<List<T>> ReadFile( )
{
    foreach (string filepath in lstOfFiles)
    {
        var stream = new FileStream(filepath , FileMode.Open, FileAccess.Read);
        foreach (var item in ReadStream(stream))
            yield return item; //where item is of type List<string>
    }
}

我想将上面的代码转换为并行块,如下所示

lstOfFiles.AsParallel()
          .ForAll(filepath =>
{
    var stream = new FileStream(filepath , FileMode.Open, FileAccess.Read);
    foreach (var item in ReadStream(Stream))
        yield return item;
});

但编译器会抛出错误,指出 Yield 不能在并行块或匿名委托中使用。我也尝试使用任务块,任务匿名委托中不允许产量

任何人都建议我简单而最好的方法来让 yield 返回并行块或任务中的数据集合。

我读到RX 2.0或TPL在上述情况下使用得很好。我不确定是否要使用 RX 或 TPL 库来异步返回值的产量。任何人都可以建议我哪个更好Rx或TPL。

如果我使用 Rx,是否有必要创建订阅并转换并行块 AsObservable。

如何使用 yield 返回并行块或任务中的 Item 集合

要使用 Rx,您必须使用 IObservable<T> 而不是 IEnumerable<T>

public IObservable<T> ReadFiles()
{
  return from filepath in lstOfFiles.ToObservable()
         from item in Observable.Using(() => File.OpenRead(filepath), ReadStream)
         select item;
}

每次对 ReadFiles 返回的可观察量调用 Subscribe 时,它都会遍历 lstOfFiles 中的所有字符串,并并行读取每个文件流。

查询按顺序打开每个文件流并将其传递给 ReadStream ,后者负责为给定流生成项目的异步序列。

ReadFiles查询使用用查询理解语法编写的SelectMany运算符,将所有ReadStream可观察量生成的每个"项目"合并到单个可观察量序列中,同时尊重源的异步性。

您应该强烈考虑为您的ReadStream方法编写异步迭代器,如我在这里所示;否则,如果您必须返回 IEnumerable<T> ,则必须通过将 ToObservable(scheduler) 运算符与并发引入调度程序一起应用来转换它,这可能会效率较低。

public IObservable<Item> ReadStream(Stream stream)
{
  return Observable.Create<Item>(async (observer, cancel) =>
  {
    // Here's one example of reading a stream with fixed item lengths.
    var buffer = new byte[itemLength]; // TODO: Define itemLength
    var remainder = itemLength;
    int read;
    do
    {
      read = await stream.ReadAsync(buffer, itemLength - remainder, remainder, cancel)
                         .ConfigureAwait(false);
      remainder -= read;
      if (read == 0)
      {
        if (remainder < itemLength)
        {
          throw new InvalidOperationException("End of stream unexpected.");
        }
        else
        {
          break;
        }
      }
      else if (remainder == 0)
      {
        observer.OnNext(ReadItem(buffer));  // TODO: Define ReadItem
        remainder = itemLength;
      }
    }
    while (true);
  });
}

* Rx 在这里不引入任何并发性。并行化只是底层 API 异步性质的结果,因此非常高效。异步读取文件流可能会导致 Windows 使用 I/O 完成端口作为优化,在每个缓冲区可用时通知池线程。这可确保 Windows 完全负责计划对应用程序的回调,而不是 TPL 或您自己。

Rx 是自由线程的,因此发送给观察程序的每个通知可能位于不同的池线程上;但是,由于 Rx 的序列化协定(§4.2 Rx 设计指南),当您调用 Subscribe 时,观察器中不会收到重叠的通知,因此无需提供显式同步,例如锁定。

但是,由于此查询的并行化性质,您可能会观察到与每个文件有关的交替通知,但永远不会看到重叠通知。

如果您希望一次接收给定文件的所有项目,正如您在问题中暗示的那样,那么您只需将 ToList 运算符应用于查询并更改返回类型:

public IObservable<IList<T>> ReadFiles()
{
  return from filepath in lstOfFiles.ToObservable()
         from items in Observable.Using(() => File.OpenRead(filepath), ReadStream)
                                 .ToList()
         select items;
}

如果需要观察具有线程相关性的通知(例如,在 GUI 线程上),则必须封送通知,因为它们将到达池线程。由于此查询本身不会引入并发性,因此实现此目的的最佳方法是应用ObserveOnDispatcher运算符(WPF、应用商店应用、电话、Silverlight)或ObserveOn(SynchronizationContext)重载(WinForms、ASP.NET 等)。只是不要忘记添加对相应特定于平台的 NuGet 包的引用;例如,Rx-WPF,Rx-WinForms,Rx-WindowsStore等。

您可能会想将可观察量转换回IEnumerable<T>,而不是调用Subscribe。不要这样做。在大多数情况下,这是不必要的,它可能效率低下,在最坏的情况下,它可能会导致死锁。一旦你进入异步的世界,你应该试着留在其中。这不仅适用于 Rx,也适用于 async/await .

看起来你想用SelectMany.您不能在匿名方法中使用yield,但可以将其分解为新方法,如下所示:

IEnumerable<Item> items = lstOfFiles.AsParallel()
    .SelectMany(( filepath ) => ReadItems(filepath));
IEnumerable<Item> ReadItems(string filePath)
{
    using(var Stream = new FileStream(filePath, FileMode.Open, FileAccess.Read))
    {
        foreach (var item in ReadStream(Stream))
            yield return item;
    }
}