平行的.ForEach会导致“内存不足”.处理带有大对象的可枚举对象时异常

本文关键字:内存不足 对象 枚举 异常 ForEach 处理 | 更新日期: 2023-09-27 18:04:51

我正在尝试迁移一个数据库,其中图像存储在数据库中,指向硬盘驱动器上的文件的数据库中的记录。我试图使用Parallel.ForEach来加快使用这种方法查询出数据的过程。

然而,我注意到我得到了一个OutOfMemory异常。我知道Parallel.ForEach将查询一批枚举,以减轻开销的成本,如果有一个间隔查询(所以你的源将更有可能有下一个记录缓存在内存中,如果你做一堆查询一次,而不是间隔它们)。问题是由于我返回的记录之一是1-4Mb字节数组,缓存导致整个地址空间用完(程序必须在x86模式下运行,因为目标平台将是32位机器)

是否有办法禁用缓存或使其更小的TPL?


下面是一个示例程序来说明这个问题。这必须在x86模式下编译,以显示问题,如果它需要很长时间或没有发生在您的机器上,则会增加数组的大小(我发现1 << 20在我的机器上大约需要30秒,4 << 20几乎是瞬时的)

class Program
{
    static void Main(string[] args)
    {
        Parallel.ForEach(CreateData(), (data) =>
            {
                data[0] = 1;
            });
    }
    static IEnumerable<byte[]> CreateData()
    {
        while (true)
        {
            yield return new byte[1 << 20]; //1Mb array
        }
    }
}

平行的.ForEach会导致“内存不足”.处理带有大对象的可枚举对象时异常

Parallel.ForEach 的默认选项仅在任务是cpu绑定的并且线性缩放时才能正常工作。当任务受cpu限制时,一切都很顺利。如果您有一个四核并且没有运行其他进程,那么Parallel.ForEach将使用所有四个处理器。如果你有一个四核处理器,而计算机上的其他进程使用了一个完整的CPU,那么Parallel.ForEach大约使用了三个处理器。

但是如果任务不是cpu限制的,那么Parallel.ForEach将继续启动任务,努力使所有cpu都处于繁忙状态。然而,无论并行运行多少任务,总是有更多未使用的CPU能力,因此它不断创建任务。

你怎么知道你的任务是cpu限制的?希望只是检查一下。如果你在分解质数,这是很明显的。但其他情况就不那么明显了。判断任务是否受cpu限制的经验方法是限制ParallelOptions.MaximumDegreeOfParallelism的最大并行度,并观察程序的行为。如果您的任务是cpu限制的,那么您应该在四核系统上看到这样的模式:

  • ParallelOptions.MaximumDegreeOfParallelism = 1:使用一个完整的CPU或25%的CPU利用率
  • ParallelOptions.MaximumDegreeOfParallelism = 2:使用2个CPU或50% CPU利用率
  • ParallelOptions.MaximumDegreeOfParallelism = 4:使用所有CPU或100% CPU利用率

如果它的行为像这样,那么你可以使用默认的Parallel.ForEach选项并获得良好的结果。线性的CPU利用率意味着良好的任务调度。

但是如果我在我的Intel i7上运行您的示例应用程序,无论我设置多大的最大并行度,我都会得到大约20%的CPU利用率。为什么会这样?由于分配了太多内存,垃圾收集器阻塞了线程。应用程序是资源绑定的,资源是内存

同样,对数据库服务器执行长时间运行查询的I/o绑定任务也永远无法有效利用本地计算机上所有可用的CPU资源。在这种情况下,任务调度程序无法"知道何时停止"启动新任务。

如果你的任务不是CPU限制的,或者CPU利用率不随最大并行度线性扩展,那么你应该建议Parallel.ForEach不要一次启动太多的任务。最简单的方法是指定一个数字,允许重叠的I/o任务有一定的并行性,但不要太多,以免超出本地计算机对资源的需求或使任何远程服务器负担过重。为了得到最好的结果,需要反复试验:

static void Main(string[] args)
{
    Parallel.ForEach(CreateData(),
        new ParallelOptions { MaxDegreeOfParallelism = 4 },
        (data) =>
            {
                data[0] = 1;
            });
}

所以,虽然Rick的建议绝对是重要的一点,但我认为还有一件事被忽略了,那就是对分区的讨论。

Parallel::ForEach将使用默认的Partitioner<T>实现,对于未知长度的IEnumerable<T>,将使用块分区策略。这意味着Parallel::ForEach将用于处理数据集的每个工作线程将从IEnumerable<T>读取一定数量的元素,然后这些元素将仅由该线程处理(现在忽略工作窃取)。这样做是为了节省不断返回源并分配一些新工作并为另一个工作线程调度的费用。所以,通常,这是一件好事。然而,在您的特定场景中,假设您在四核上,并且您已经将MaxDegreeOfParallelism设置为4个线程用于您的工作,现在每个线程从IEnumerable<T>中提取100个元素。好吧,这是100-400兆对于那个特定的工作线程,对吧?

那么如何解决这个问题呢?很简单,您编写一个自定义的Partitioner<T>实现。现在,分块在您的情况下仍然是有用的,所以您可能不想使用单元素分区策略,因为那样会引入所有必要的任务协调的开销。相反,我会编写一个可配置的版本,您可以通过appsetting对其进行调优,直到找到适合您工作负载的最佳平衡。好消息是,虽然编写这样的实现非常简单,但实际上您甚至不必自己编写它,因为PFX团队已经完成并将其放入并行编程示例项目中。

这个问题与分区器有关,而与并行度无关。解决方案是实现一个自定义数据分区器。

如果数据集很大,那么TPL的单一实现似乎可以保证内存不足。这种情况最近发生在我身上(实际上我正在运行上面的循环,并发现内存线性增加,直到它给我一个OOM异常)。

跟踪问题后,我发现默认情况下mono会划分使用EnumerablePartitioner类的枚举器。这门课有它的行为是每次将数据分配给任务时,它都会"分块"数据以一个不断增加的(不可改变的)因子2。首先当任务请求数据时,它会获得大小为1的块,下一次大小为12*1=2,下一次2*2=4,然后2*4=8,以此类推。结果是传递给任务的数据量,因此存储在内存中同时,随着任务的长度和数据量的增加而增加

可以推测,这种行为的最初原因是它想要避免让每个线程返回多次以获取数据,但它似乎是基于所有正在处理的数据都可以放入内存的假设(从大文件读取时不是这种情况)。

如前所述,使用自定义分区器可以避免此问题。下面是一次只将数据返回给每个任务一项的一个通用示例: https://gist.github.com/evolvedmicrobe/7997971

只需首先实例化该类并将其交给Parallel。而不是枚举本身

虽然使用自定义分区器无疑是最"正确"的答案,但更简单的解决方案是让垃圾收集器赶上来。在我尝试的情况下,我不断地给一个平行体打电话。函数内的For循环。尽管每次退出函数时,程序使用的内存都保持线性增长,如下所述。我说:

//Force garbage collection.
GC.Collect();
// Wait for all finalizers to complete before continuing.
GC.WaitForPendingFinalizers();

,虽然它不是超级快,但它确实解决了内存问题。假设在高cpu使用率和内存使用率时,垃圾收集器无法有效地运行。