为什么localInit函数在Parallel.ForEach中每个线程被调用多次?

本文关键字:线程 调用 函数 localInit Parallel ForEach 为什么 | 更新日期: 2023-09-27 17:54:27

我正在编写一些代码来处理大量数据,我认为使用Parallel会很有用。ForEach为它创建的每个线程创建一个文件,这样输出就不需要同步了(至少对我来说)。

它看起来像这样:

Parallel.ForEach(vals,
    new ParallelOptions { MaxDegreeOfParallelism = 8 },
    ()=>GetWriter(), // returns a new BinaryWriter backed by a file with a guid name
    (item, state, writer)=>
    {
        if(something)
        {
            state.Break();
            return writer;
        }
        List<Result> results = new List<Result>();
        foreach(var subItem in item.SubItems)
            results.Add(ProcessItem(subItem));
        if(results.Count > 0)
        {
            foreach(var result in results)
                result.Write(writer);
        }
        return writer;
    },
    (writer)=>writer.Dispose());

我所期望发生的是,最多将创建8个文件,并在整个运行时持续存在。然后,当整个ForEach调用结束时,每个都将被dispose。实际发生的情况是,localInit似乎对每个项调用一次,所以我最终得到了数百个文件。写入器也会在处理的每个项结束时被处理。

这显示了同样的事情发生:

var vals = Enumerable.Range(0, 10000000).ToArray();
        long sum = 0;
        Parallel.ForEach(vals,
            new ParallelOptions { MaxDegreeOfParallelism = 8 },
            () => { Console.WriteLine("init " + Thread.CurrentThread.ManagedThreadId); return 0L; },
            (i, state, common) =>
            {
                Thread.Sleep(10);
                return common + i;
            },
                (common) => Interlocked.Add(ref sum, common));
我看到

:

init 10
init 14
init 11
init 13
init 12
init 14
init 11
init 12
init 13
init 11
... // hundreds of lines over < 30 seconds
init 14
init 11
init 18
init 17
init 10
init 11
init 14
init 11
init 14
init 11
init 18

注意:如果我遗漏线程。睡眠呼叫,有时似乎功能"正确"。localInit只被调用一次,每个4个线程,它决定在我的pc上使用。但也不是每次都这样。

这是函数期望的行为吗?是什么在幕后导致它这样做呢?最后,什么是获得我想要的功能的好方法,ThreadLocal?

顺便说一下,这是在。net 4.5上。

为什么localInit函数在Parallel.ForEach中每个线程被调用多次?

Parallel.ForEach不像你想象的那样工作。值得注意的是,该方法是建立在Task类之上的,并且 TaskThread之间的关系不是1:1。例如,您可以在2个托管线程上运行10个任务。

尝试在你的方法体中使用这一行,而不是当前这一行:

Console.WriteLine("ThreadId {0} -- TaskId {1} ",
                  Thread.CurrentThread.ManagedThreadId, Task.CurrentId);

您应该看到ThreadId将在许多不同的任务中被重用,由它们的唯一id显示。如果您保留或增加对Thread.Sleep的调用,您将更多地看到这一点。

Parallel.ForEach方法如何工作的(非常)基本思想是,它让你的enumerable创建一系列任务,这些任务将运行枚举的过程部分,完成的方式很大程度上取决于输入。还有一些特殊的逻辑,用于检查任务是否超过一定毫秒数而未完成。如果这种情况为真,则可以生成一个新任务来帮助减轻工作。

如果您查看Parallel.ForEachlocalinit函数的文档,您会注意到它说它是returns the initial state of the local data for each _task_,而不是每个线程

你可能会问为什么有超过8个任务被生成。这个答案与最后一个类似,可以在ParallelOptions.MaxDegreeOfParallelism的文档中找到。

MaxDegreeOfParallelism从默认值更改为限制并发任务的使用数量。

此限制仅针对并发任务的数量,而不是在整个处理过程中创建的任务数量的硬限制。正如我上面提到的,有时会产生一个单独的任务,这导致多次调用localinit函数并将数百个文件写入磁盘。

写入磁盘肯定是一个有一点延迟的操作,特别是如果您使用同步I/O。当磁盘操作发生时,它阻塞了整个线程;Thread.Sleep也是如此。如果Task这样做,它将阻塞当前正在运行的线程,并且没有其他任务可以在其上运行。通常在这种情况下,调度器会生成一个新的Task来帮助弥补空闲。

最后,什么是获得我想要的功能的好方法,ThreadLocal?

底线是线程局部变量对Parallel.ForEach没有意义,因为你没有处理线程;你在处理任务。线程本地可以在任务之间共享,因为许多任务可以同时使用同一个线程。此外,任务的线程本地可以在执行过程中更改,因为调度器可以抢占它的运行,然后在另一个线程上继续执行,该线程将具有不同的线程本地。

我不确定最好的方法,但是你可以依靠localinit函数来传递你想要的任何资源,一次只允许一个资源在一个线程中使用。您可以使用localfinally将其标记为不再使用,从而可用于其他任务获取。这就是设计这些方法的目的;对于生成的任务,每个方法只被调用一次(参见Parallel.ForEach MSDN文档的注释部分)。

您也可以自己拆分工作,创建自己的线程集并运行您的工作。然而,在我看来,这是不太明智的,因为Parallel类已经为您完成了这项繁重的工作。

您所看到的是试图尽快完成您的工作的实现。

为此,它尝试使用不同数量的任务来最大化吞吐量。它从线程池中获取一定数量的线程,并运行您的工作一段时间。然后尝试添加和删除线程,看看会发生什么。它会继续这样做,直到你所有的工作都完成。

这个算法是相当愚蠢的,因为它不知道你的工作是否使用了大量的CPU,或者大量的IO,或者即使有很多同步并且线程相互阻塞。它所能做的就是添加和删除线程,并测量每个工作单元完成的速度。

这意味着它在注入和退出线程时不断调用localInitlocalFinally函数-这就是你所发现的。

不幸的是,没有简单的方法来控制这个算法。Parallel.ForEach是一个高级结构,它有意隐藏了许多线程管理代码。


使用ThreadLocal可能会有所帮助,但它依赖于这样一个事实,即当Parallel.ForEach请求新线程时,线程池将重用相同的线程。这并不能保证——事实上,线程池不太可能在整个调用中恰好使用8个线程。这意味着您将再次创建多余的文件。


保证的一件事是Parallel.ForEach在任何时候都不会使用超过MaxDegreeOfParallelism的线程。

您可以通过创建一个固定大小的文件"池"来利用这一点,该文件池可以被在特定时间运行的任何线程重用。您知道一次只能运行MaxDegreeOfParallelism线程,因此可以在调用ForEach之前创建这个数量的文件。然后在localInit中抓取一个,并在localFinally中释放它。

当然,您必须自己编写这个池,并且它必须是线程安全的,因为它将被并发调用。不过,一个简单的锁定策略应该就足够了,因为与锁的成本相比,线程的注入和退役速度不会很快。

根据MSDN, localInit方法为每个任务调用一次,而不是为每个线程调用:

localInit委托为参与循环执行的每个任务调用一次,并返回每个任务的初始本地状态。

localInit在线程创建时调用。如果主体花了很长时间,它必须创建另一个线程并挂起当前线程,如果它创建了另一个线程,它会调用localInit

也适用于Parallel。调用ForEach时,创建的线程与MaxDegreeOfParallelism的值一样多,例如:

var k = Enumerable.Range(0, 1);
Parallel.ForEach(k,new ParallelOptions(){MaxDegreeOfParallelism = 4}.....

第一次调用

时创建4个线程