为什么Parallel.ForEach比AsParallel().ForAll()快得多,尽管MSDN提出了相反的建议

本文关键字:MSDN ForEach Parallel AsParallel ForAll 为什么 快得多 尽管 | 更新日期: 2023-09-27 18:22:23

我一直在做一些调查,看看如何创建一个通过树运行的多线程应用程序。

为了找到如何以最佳方式实现这一点,我创建了一个测试应用程序,该应用程序运行在我的C:''disk中并打开所有目录。

class Program
{
    static void Main(string[] args)
    {
        //var startDirectory = @"C:'The folder'RecursiveFolder";
        var startDirectory = @"C:'";
        var w = Stopwatch.StartNew();
        ThisIsARecursiveFunction(startDirectory);
        Console.WriteLine("Elapsed seconds: " + w.Elapsed.TotalSeconds);
        Console.ReadKey();
    }
    public static void ThisIsARecursiveFunction(String currentDirectory)
    {
        var lastBit = Path.GetFileName(currentDirectory);
        var depth = currentDirectory.Count(t => t == '''');
        //Console.WriteLine(depth + ": " + currentDirectory);
        try
        {
            var children = Directory.GetDirectories(currentDirectory);
            //Edit this mode to switch what way of parallelization it should use
            int mode = 3;
            switch (mode)
            {
                case 1:
                    foreach (var child in children)
                    {
                        ThisIsARecursiveFunction(child);
                    }
                    break;
                case 2:
                    children.AsParallel().ForAll(t =>
                    {
                        ThisIsARecursiveFunction(t);
                    });
                    break;
                case 3:
                    Parallel.ForEach(children, t =>
                    {
                        ThisIsARecursiveFunction(t);
                    });
                    break;
                default:
                    break;
            }
        }
        catch (Exception eee)
        {
            //Exception might occur for directories that can't be accessed.
        }
    }
}

然而,我遇到的是,当在模式3(Parallel.ForEach)下运行时,代码在大约2.5秒内完成(是的,我有一个SSD;)。在不进行并行化的情况下运行代码,大约在8秒内完成。在模式2(AsParalle.ForAll())下运行代码需要花费几乎无限长的时间。

在检查流程资源管理器时,我还遇到了一些奇怪的事实:

Mode1 (No Parallelization):
Cpu:     ~25%
Threads: 3
Time to complete: ~8 seconds
Mode2 (AsParallel().ForAll()):
Cpu:     ~0%
Threads: Increasing by one per second (I find this strange since it seems to be waiting on the other threads to complete or a second timeout.)
Time to complete: 1 second per node so about 3 days???
Mode3 (Parallel.ForEach()):
Cpu:     100%
Threads: At most 29-30
Time to complete: ~2.5 seconds

我发现特别奇怪的是,Parallel.ForEach似乎忽略了任何仍在运行的父线程/任务,而AsParallel()。ForAll()似乎在等待前一个任务完成(这不会很快,因为所有父任务仍在等待其子任务完成)。

我在MSDN上读到的还有:"当可能的时候,更喜欢ForAll而不是ForEach"

来源:http://msdn.microsoft.com/en-us/library/dd997403(v=vs.110).aspx

有人知道为什么会这样吗?

编辑1:

按照Matthew Watson的要求,我先将树加载到内存中,然后循环通过它。现在,树的加载是按顺序完成的。

然而,结果是相同的。Unparallelized和Parallel.ForEach现在在大约0.05秒内完成整个树,而AsParallel().ForAll仍然每秒只进行大约1步。

代码:

class Program
{
    private static DirWithSubDirs RootDir;
    static void Main(string[] args)
    {
        //var startDirectory = @"C:'The folder'RecursiveFolder";
        var startDirectory = @"C:'";
        Console.WriteLine("Loading file system into memory...");
        RootDir = new DirWithSubDirs(startDirectory);
        Console.WriteLine("Done");

        var w = Stopwatch.StartNew();
        ThisIsARecursiveFunctionInMemory(RootDir);
        Console.WriteLine("Elapsed seconds: " + w.Elapsed.TotalSeconds);
        Console.ReadKey();
    }        
    public static void ThisIsARecursiveFunctionInMemory(DirWithSubDirs currentDirectory)
    {
        var depth = currentDirectory.Path.Count(t => t == '''');
        Console.WriteLine(depth + ": " + currentDirectory.Path);
        var children = currentDirectory.SubDirs;
        //Edit this mode to switch what way of parallelization it should use
        int mode = 2;
        switch (mode)
        {
            case 1:
                foreach (var child in children)
                {
                    ThisIsARecursiveFunctionInMemory(child);
                }
                break;
            case 2:
                children.AsParallel().ForAll(t =>
                {
                    ThisIsARecursiveFunctionInMemory(t);
                });
                break;
            case 3:
                Parallel.ForEach(children, t =>
                {
                    ThisIsARecursiveFunctionInMemory(t);
                });
                break;
            default:
                break;
        }
    }
}
class DirWithSubDirs
{
    public List<DirWithSubDirs> SubDirs = new List<DirWithSubDirs>();
    public String Path { get; private set; }
    public DirWithSubDirs(String path)
    {
        this.Path = path;
        try
        {
            SubDirs = Directory.GetDirectories(path).Select(t => new DirWithSubDirs(t)).ToList();
        }
        catch (Exception eee)
        {
            //Ignore directories that can't be accessed
        }
    }
}

编辑2:

在阅读了Matthew评论的更新后,我尝试将以下代码添加到程序中:

ThreadPool.SetMinThreads(4000, 16);
ThreadPool.SetMaxThreads(4000, 16);

然而,这并没有改变AsParallel的形成方式。在减速到1步/秒之前,前8步仍在瞬间执行。

(特别注意,我目前正在忽略当我无法通过Directory.GetDirectories()周围的Try-Catch块访问目录时发生的异常。)

编辑3:

另外,我主要感兴趣的是Parallel.ForEach和AsParallel.ForAll之间的区别,因为对我来说,奇怪的是,出于某种原因,第二个为它所做的每一次递归创建一个线程,而第一个则最多处理30个线程中的所有线程

编辑4:

我发现了另一件奇怪的事情:当我试图将线程池上的MinThreads设置为1023以上时,它似乎忽略了该值,并缩回到8或16左右:ThreadPool.SetMinThreads(1023,16);

尽管如此,当我使用1023时,它会非常快地完成前1023个元素,然后回到我一直经历的缓慢节奏。

注意:实际上,现在创建了1000多个线程(相比之下,整个Parallel.ForEach创建了30个线程)。

这是否意味着Parallel.ForEach在处理任务方面更聪明?

更多信息,当您将值设置为1023以上时,此代码会打印两次8-8:

        int threadsMin;
        int completionMin;
        ThreadPool.GetMinThreads(out threadsMin, out completionMin);
        Console.WriteLine("Cur min threads: " + threadsMin + " and the other thing: " + completionMin);
        ThreadPool.SetMinThreads(1023, 16);
        ThreadPool.SetMaxThreads(1023, 16);
        ThreadPool.GetMinThreads(out threadsMin, out completionMin);
        Console.WriteLine("Now min threads: " + threadsMin + " and the other thing: " + completionMin);

编辑5:

根据Dean的要求,我创建了另一个手动创建任务的案例:

case 4:
    var taskList = new List<Task>();
    foreach (var todo in children)
    {
        var itemTodo = todo;
        taskList.Add(Task.Run(() => ThisIsARecursiveFunctionInMemory(itemTodo)));
    }
    Task.WaitAll(taskList.ToArray());
    break;

这也和Parallel.ForEach()循环一样快。因此,我们仍然不知道为什么AsParallel().ForAll()要慢得多。

为什么Parallel.ForEach比AsParallel().ForAll()快得多,尽管MSDN提出了相反的建议

这个问题很容易调试,当您遇到线程问题时,这是一种罕见的奢侈。这里的基本工具是"调试">"窗口">"线程"调试器窗口。向您显示活动线程,并让您了解它们的堆栈跟踪。你会很容易地看到,一旦速度变慢,你会有几十个的线程处于活动状态,它们都被卡住了。他们的堆栈跟踪看起来都一样:

    mscorlib.dll!System.Threading.Monitor.Wait(object obj, int millisecondsTimeout, bool exitContext) + 0x16 bytes  
    mscorlib.dll!System.Threading.Monitor.Wait(object obj, int millisecondsTimeout) + 0x7 bytes 
    mscorlib.dll!System.Threading.ManualResetEventSlim.Wait(int millisecondsTimeout, System.Threading.CancellationToken cancellationToken) + 0x182 bytes    
    mscorlib.dll!System.Threading.Tasks.Task.SpinThenBlockingWait(int millisecondsTimeout, System.Threading.CancellationToken cancellationToken) + 0x93 bytes   
    mscorlib.dll!System.Threading.Tasks.Task.InternalRunSynchronously(System.Threading.Tasks.TaskScheduler scheduler, bool waitForCompletion) + 0xba bytes  
    mscorlib.dll!System.Threading.Tasks.Task.RunSynchronously(System.Threading.Tasks.TaskScheduler scheduler) + 0x13 bytes  
    System.Core.dll!System.Linq.Parallel.SpoolingTask.SpoolForAll<ConsoleApplication1.DirWithSubDirs,int>(System.Linq.Parallel.QueryTaskGroupState groupState, System.Linq.Parallel.PartitionedStream<ConsoleApplication1.DirWithSubDirs,int> partitions, System.Threading.Tasks.TaskScheduler taskScheduler) Line 172  C#
// etc..

每当你看到这样的事情时,你应该立即思考消防软管的问题。可能是线程中第三常见的错误,仅次于争用和死锁。

现在你知道原因了,代码的问题是每个完成的线程都会增加N个线程。其中,N是一个目录中子目录的平均数量。实际上,线程数量以指数方式增长,这总是很糟糕的。它只会在N=1的情况下保持控制,当然这在典型的磁盘上永远不会发生。

请注意,就像几乎任何线程问题一样,这种不当行为往往会糟糕地重复。机器中的SSD往往会隐藏它。机器中的RAM也是如此,程序很可能会在第二次运行时快速完成并无故障。由于您现在将从文件系统缓存而不是磁盘读取,因此速度非常快。修补ThreadPool.SetMinThreads()也隐藏了它,但它无法修复它。它从不修复任何问题,只隐藏它们。因为无论发生什么情况,指数总是会超过设置的最小线程数。您只能希望它在这之前完成对驱动器的迭代。对于拥有大硬盘的用户来说,这是一种闲置的希望。

ParallelEnumerable.ForAll()和Parallel.ForEach()之间的区别现在可能也很容易解释了。从堆栈跟踪中可以看出ForAll()做了一些顽皮的事情,RunSynchronously()方法会阻塞,直到所有线程都完成。阻塞是线程池线程不应该做的事情,它会阻塞线程池,不允许它为另一个作业调度处理器。正如您所观察到的效果,线程池很快就会被等待N个其他线程完成的线程淹没。这并没有发生,他们在游泳池里等待,没有得到安排,因为他们中已经有很多人在活动了。

这是一种死锁情况,非常常见,但线程池管理器有一个解决方法。它监视活动的线程池线程,并在它们没有及时完成时介入。然后,它允许启动一个额外的线程,比SetMinThreads()设置的最小值多一个。但不超过SetMaxThreads()设置的最大值,拥有过多的活动tp线程是有风险的,可能会触发OOM。这确实解决了死锁,它完成了其中一个ForAll()调用。但这种情况发生的速度非常慢,线程池每秒只执行两次。在它赶上之前,你会失去耐心的。

Parallel.ForEach()没有这个问题,它不会阻塞,所以不会阻塞池。

这似乎是一个解决方案,但请记住,您的程序仍在对机器的内存进行烧录,向池中添加更多等待的tp线程。这也可能会使程序崩溃,只是不太可能,因为你有很多内存,而线程池不会使用太多内存来跟踪请求。然而,一些程序员也能做到这一点。

解决方案非常简单,只是不要使用线程。这是有害的,当您只有一个磁盘时,没有并发。它确实而不像那样被多个线程征用。特别是在主轴驱动器上,磁头查找非常非常慢。固态硬盘做得更好,但它仍然需要轻松的50微秒,这是你不想要或不需要的开销。访问无法很好缓存的磁盘的理想线程数总是一个

首先要注意的是,您正在尝试并行IO绑定操作,这将严重扭曲时序。

第二件需要注意的事情是并行任务的性质:您正在递归地递减目录树。如果创建多个线程来执行此操作,则每个线程很可能同时访问磁盘的不同部分,这将导致磁盘读取头到处跳跃,并大大降低速度。

尝试更改测试以创建内存中的树,并使用多个线程访问该树。然后,您将能够正确地比较时间,而不会使结果失真,超出所有用途。

此外,您可能正在创建大量线程,它们(默认情况下)将是线程池线程。当线程数量超过处理器核心数量时,拥有大量线程实际上会减慢速度。

还要注意,当您超过线程池最小线程数(由ThreadPool.GetMinThreads()定义)时,线程池管理器会在每个新的线程池线程创建之间引入延迟。(我认为这大约是每个新线程0.5秒)。

此外,如果线程数超过ThreadPool.GetMaxThreads()返回的值,则创建线程将阻塞,直到其他线程之一退出为止。我认为这种情况很可能会发生。

您可以通过调用ThreadPool.SetMaxThreads()ThreadPool.SetMinThreads()来增加这些值来检验这个假设,看看它是否有任何不同。

(最后,请注意,如果你真的试图从C:'递归下降,那么当它到达受保护的OS文件夹时,你几乎肯定会遇到IO异常。)

注意:设置最大/最小线程池线程如下:

ThreadPool.SetMinThreads(4000, 16);
ThreadPool.SetMaxThreads(4000, 16);

后续

我已经用如上所述设置的线程池线程数尝试了您的测试代码,结果如下(不是在整个C:''驱动器上运行,而是在较小的子集上运行):

  • 模式1耗时06.5秒
  • 模式2耗时15.7秒
  • 模式3耗时16.4秒

这符合我的期望;添加大量的线程来实现这一点实际上比单线程要慢,而且这两种并行方法所花费的时间大致相同。


如果其他人想对此进行调查,这里有一些决定性的测试代码(OP的代码是不可复制的,因为我们不知道他的目录结构)。

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
namespace Demo
{
    internal class Program
    {
        private static DirWithSubDirs RootDir;
        private static void Main()
        {
            Console.WriteLine("Loading file system into memory...");
            RootDir = new DirWithSubDirs("Root", 4, 4);
            Console.WriteLine("Done");
            //ThreadPool.SetMinThreads(4000, 16);
            //ThreadPool.SetMaxThreads(4000, 16);
            var w = Stopwatch.StartNew();
            ThisIsARecursiveFunctionInMemory(RootDir);
            Console.WriteLine("Elapsed seconds: " + w.Elapsed.TotalSeconds);
            Console.ReadKey();
        }
        public static void ThisIsARecursiveFunctionInMemory(DirWithSubDirs currentDirectory)
        {
            var depth = currentDirectory.Path.Count(t => t == '''');
            Console.WriteLine(depth + ": " + currentDirectory.Path);
            var children = currentDirectory.SubDirs;
            //Edit this mode to switch what way of parallelization it should use
            int mode = 3;
            switch (mode)
            {
                case 1:
                    foreach (var child in children)
                    {
                        ThisIsARecursiveFunctionInMemory(child);
                    }
                    break;
                case 2:
                    children.AsParallel().ForAll(t =>
                    {
                        ThisIsARecursiveFunctionInMemory(t);
                    });
                    break;
                case 3:
                    Parallel.ForEach(children, t =>
                    {
                        ThisIsARecursiveFunctionInMemory(t);
                    });
                    break;
                default:
                    break;
            }
        }
    }
    internal class DirWithSubDirs
    {
        public List<DirWithSubDirs> SubDirs = new List<DirWithSubDirs>();
        public String Path { get; private set; }
        public DirWithSubDirs(String path, int width, int depth)
        {
            this.Path = path;
            if (depth > 0)
                for (int i = 0; i < width; ++i)
                    SubDirs.Add(new DirWithSubDirs(path + "''" + i, width, depth - 1));
        }
    }
}

Parallel.For和.ForEach方法在内部实现,相当于在任务中运行迭代,例如,类似于的循环

Parallel.For(0, N, i => 
{ 
  DoWork(i); 
});

相当于:

var tasks = new List<Task>(N); 
for(int i=0; i<N; i++) 
{ 
tasks.Add(Task.Factory.StartNew(state => DoWork((int)state), i)); 
} 
Task.WaitAll(tasks.ToArray());

从每个迭代可能与其他迭代并行运行的角度来看,这是一个ok心理模型,但在现实中不会发生。事实上,并行并不一定每个迭代使用一个任务,因为这比必要的开销要大得多。Parallel.ForEach尝试使用尽可能快地完成循环所需的最小任务数。当线程可以处理这些任务时,它会启动任务,并且每个任务都参与一个管理方案(我认为这叫做分块):一个任务要求完成多次迭代,得到它们,然后处理这些工作,然后返回进行更多迭代。块大小根据参与的任务数量、机器上的负载等而变化。

PLINQ的.AsPalallel()有一个不同的实现,但它仍然可以类似地将多次迭代提取到临时存储中,在线程中进行计算(但不是作为任务),并将查询结果放入小缓冲区。(您可以获得一些基于ParallelQuery的东西,然后进一步将.Whatever()函数绑定到一组提供并行实现的替代扩展方法)。

既然我们对这两种机制的工作原理有了一个小的了解,我将尝试为您最初的问题提供一个答案:

为什么.AsPalallel()比Parallel.ForEach慢?原因如下。任务(或此处的等效实现)在类似I/O的调用上执行NOT阻塞。它们"等待"并释放CPU来做其他事情。但是(引用C#果壳书):"PLINQ不能在不阻塞线程的情况下执行I/O绑定工作"。调用是同步。编写它们的目的是,如果(并且只有当)你正在做这样的事情,比如每个任务下载网页,而不占用CPU时间,那么你就可以提高并行度。

您的函数调用与I/O绑定调用完全类似的原因是:您的一个线程(称为T)阻塞,在其所有子线程完成之前什么都不做,这可能是一个缓慢的过程。T本身在等待子代解锁时不是CPU密集型的,它什么也不做,只是在等待。因此,它与典型的I/O绑定函数调用相同。

基于AsParallel究竟是如何工作的公认答案?

.AsParallel.ForAll()在调用.ForAll() 之前强制转换回IEnumerable

因此它创建了1个新线程+N个递归调用(每个调用生成一个新线程)。