如何使用 yield 返回并行块或任务中的 Item 集合
本文关键字:任务 Item 集合 何使用 yield 返回 并行 | 更新日期: 2023-09-27 18:35:30
我正在寻找有关如何利用 yield 关键字在并行块或任务块中返回 IEnumberable 的帮助。下面是伪代码
public IEnumerable<List<T>> ReadFile( )
{
foreach (string filepath in lstOfFiles)
{
var stream = new FileStream(filepath , FileMode.Open, FileAccess.Read);
foreach (var item in ReadStream(stream))
yield return item; //where item is of type List<string>
}
}
我想将上面的代码转换为并行块,如下所示
lstOfFiles.AsParallel()
.ForAll(filepath =>
{
var stream = new FileStream(filepath , FileMode.Open, FileAccess.Read);
foreach (var item in ReadStream(Stream))
yield return item;
});
但编译器会抛出错误,指出 Yield 不能在并行块或匿名委托中使用。我也尝试使用任务块,任务匿名委托中不允许产量
任何人都建议我简单而最好的方法来让 yield 返回并行块或任务中的数据集合。
我读到RX 2.0或TPL在上述情况下使用得很好。我不确定是否要使用 RX 或 TPL 库来异步返回值的产量。任何人都可以建议我哪个更好Rx或TPL。
如果我使用 Rx,是否有必要创建订阅并转换并行块 AsObservable。
要使用 Rx,您必须使用 IObservable<T>
而不是 IEnumerable<T>
。
public IObservable<T> ReadFiles()
{
return from filepath in lstOfFiles.ToObservable()
from item in Observable.Using(() => File.OpenRead(filepath), ReadStream)
select item;
}
每次对 ReadFiles
返回的可观察量调用 Subscribe
时,它都会遍历 lstOfFiles
中的所有字符串,并并行读取每个文件流。
查询按顺序打开每个文件流并将其传递给 ReadStream
,后者负责为给定流生成项目的异步序列。
ReadFiles
查询使用用查询理解语法编写的SelectMany
运算符,将所有ReadStream
可观察量生成的每个"项目"合并到单个可观察量序列中,同时尊重源的异步性。
您应该强烈考虑为您的ReadStream
方法编写异步迭代器,如我在这里所示;否则,如果您必须返回 IEnumerable<T>
,则必须通过将 ToObservable(scheduler)
运算符与并发引入调度程序一起应用来转换它,这可能会效率较低。
public IObservable<Item> ReadStream(Stream stream)
{
return Observable.Create<Item>(async (observer, cancel) =>
{
// Here's one example of reading a stream with fixed item lengths.
var buffer = new byte[itemLength]; // TODO: Define itemLength
var remainder = itemLength;
int read;
do
{
read = await stream.ReadAsync(buffer, itemLength - remainder, remainder, cancel)
.ConfigureAwait(false);
remainder -= read;
if (read == 0)
{
if (remainder < itemLength)
{
throw new InvalidOperationException("End of stream unexpected.");
}
else
{
break;
}
}
else if (remainder == 0)
{
observer.OnNext(ReadItem(buffer)); // TODO: Define ReadItem
remainder = itemLength;
}
}
while (true);
});
}
* Rx 在这里不引入任何并发性。并行化只是底层 API 异步性质的结果,因此非常高效。异步读取文件流可能会导致 Windows 使用 I/O 完成端口作为优化,在每个缓冲区可用时通知池线程。这可确保 Windows 完全负责计划对应用程序的回调,而不是 TPL 或您自己。
Rx 是自由线程的,因此发送给观察程序的每个通知可能位于不同的池线程上;但是,由于 Rx 的序列化协定(§4.2 Rx 设计指南),当您调用 Subscribe
时,观察器中不会收到重叠的通知,因此无需提供显式同步,例如锁定。
但是,由于此查询的并行化性质,您可能会观察到与每个文件有关的交替通知,但永远不会看到重叠通知。
如果您希望一次接收给定文件的所有项目,正如您在问题中暗示的那样,那么您只需将 ToList
运算符应用于查询并更改返回类型:
public IObservable<IList<T>> ReadFiles()
{
return from filepath in lstOfFiles.ToObservable()
from items in Observable.Using(() => File.OpenRead(filepath), ReadStream)
.ToList()
select items;
}
如果需要观察具有线程相关性的通知(例如,在 GUI 线程上),则必须封送通知,因为它们将到达池线程。由于此查询本身不会引入并发性,因此实现此目的的最佳方法是应用ObserveOnDispatcher
运算符(WPF、应用商店应用、电话、Silverlight)或ObserveOn(SynchronizationContext)
重载(WinForms、ASP.NET 等)。只是不要忘记添加对相应特定于平台的 NuGet 包的引用;例如,Rx-WPF,Rx-WinForms,Rx-WindowsStore等。
您可能会想将可观察量转换回IEnumerable<T>
,而不是调用Subscribe
。不要这样做。在大多数情况下,这是不必要的,它可能效率低下,在最坏的情况下,它可能会导致死锁。一旦你进入异步的世界,你应该试着留在其中。这不仅适用于 Rx,也适用于 async/await
.
看起来你想用SelectMany
.您不能在匿名方法中使用yield
,但可以将其分解为新方法,如下所示:
IEnumerable<Item> items = lstOfFiles.AsParallel()
.SelectMany(( filepath ) => ReadItems(filepath));
IEnumerable<Item> ReadItems(string filePath)
{
using(var Stream = new FileStream(filePath, FileMode.Open, FileAccess.Read))
{
foreach (var item in ReadStream(Stream))
yield return item;
}
}