尝试在长时间运行的生成器上使用PLINQ的缺陷

本文关键字:缺陷 PLINQ 长时间 运行 | 更新日期: 2023-09-27 18:21:16

我有一些无限生成器方法,包括一些长时间运行和无限长时间运行的生成器。

IEnumerable<T> ExampleOne() { 
    while(true) // this one blocks for a few seconds at a time
        yield return LongRunningFunction();
}
IEnumerable<T> ExampleTwo() { 
    while(true) //this one blocks for a really long time
        yield return OtherLongRunningFunction();
}

我的目标是有一个无限序列,将两个例子中的项目组合在一起。以下是我使用PLINQ:进行的尝试

 IEnumerable<T> combined =  new[] { ExampleOne(), ExampleTwo() }
           .AsParallel()
           .WithMergeOptions(ParallelMergeOptions.NotBuffered)
           .WithExecutionMode(ParallelExecutionMode.ForceParallelism)
           .SelectMany(source => source.GetRequests());

这似乎适当地将两个IEnumerable组合成一个新的IEnumerables,每当IEnumerable#1和#2的项目出现在两个源IEnumerables:中的任何一个中时,它们都可用

//assuming ExampleTwo yields TWO but happens roughly 5 times 
//less often then ExampleOne
Example output:  one one one one one TWO one one one one one one TWO

然而,似乎有时(通常在运行了许多小时之后)OtherLongRunningFunction()会持续很长一段时间而不返回,并且在难以复制的条件下,combined序列会阻塞它,而不是继续返回第一个LongRunningFunction的结果。看起来,尽管组合并行查询一开始使用两个线程,但后来决定切换到一个线程

我的第一个想法是"这可能是RX Observable.Merge的工作,而不是PLINQ的工作。"但我很感激这两个答案,它们显示了处理这种情况的正确替代方法,以及PLINQ如何在查询开始数小时后更改并行度的机制的解释。

尝试在长时间运行的生成器上使用PLINQ的缺陷

这是Rx的方法,事实上,它确实使用了Merge:

IObservable<T> LongRunningFunction()
{
    return Observable.Start(() => {
        // Calculate some stuff
        return blah;
    }, Scheduler.TaskPoolScheduler);
}
Observable.Merge(
    Observable.Defer(LongRunningFunction).Repeat(),
    Observable.Defer(OtherLongRunningFunction).Repeat(),
).Subscribe(x => {
    Console.WriteLine("An item: {0}", x);
});

如果你想要TPL的好处,特别是对于负载不同的任务(当你的订阅被阻止,并且已经生成了很多项目时会发生什么?你应该停止生成项目吗?),我推荐TPL DataFlow。

如果你想用Rx做这件事,对于真正长时间运行的计算任务,最好不要阻塞线程池:

var stream = Observable.Merge(ExampleTwo().ToObservable(Scheduler.NewThread), ExampleOne().ToObservable(Scheduler.NewThread));
stream.Subscribe(...);

关于PLINQ:的机制

我遇到了同样的问题:我有一个序列,它的项目需要不均匀的处理时间,其中一些项目需要更长的数量级。我经历了线程饥饿,在8核处理器上比在4核处理器上更具可再现性,尽管在4核上处理数小时后也可能发生这种情况。一些线程可能会在一段时间后重新开始工作。请注意,使用了动态分块,如示例所示。

观察:饥饿更可能发生在连续的、长时间运行的工作项目完成时。

MSDN主题"并行循环"提供了一些启示:

如果使用需要几秒钟或更长时间的单个步骤的并行循环,请小心。这可能发生在I/O绑定的工作负载以及冗长的计算中。如果循环需要很长时间,那么由于.NET ThreadPool类的线程注入逻辑使用了一种防止线程饥饿的启发式方法,您可能会经历工作线程的无限增长。当当前池的工作项长时间运行时,这种启发式方法会稳步增加工作线程的数量。其动机是在线程池中的所有内容都被阻塞的情况下添加更多的线程。不幸的是,如果工作确实在进行,那么更多的线程可能不一定是您想要的。.NET框架无法区分这两种情况

我仍然不知道细节,但我认为潜在的ThreadPool的启发式方法不能很好地解释长时间运行的工作项,由于一些上限没有适当调整,无法为下一次迭代容纳线程,从而使迭代排队。我没有Visual Studio访问8核机器的权限,在那里问题更容易重现。我还无法在4核机器上通过Visual Studio调试重现这个问题。调查仍在继续。

有关更多详细信息,"任务并行库(或PLINQ)是否考虑了其他进程?"主题非常相关。