What does MaxDegreeOfParallelism do?

本文关键字:do MaxDegreeOfParallelism does What | 更新日期: 2023-09-27 18:26:41

我使用的是Parallel.ForEach,我正在进行一些数据库更新,现在没有设置MaxDegreeOfParallelism,双核处理器机器会导致SQL客户端超时,而四核处理器机器则不会超时。

现在,我无法控制在代码运行的地方可以使用哪种处理器内核,但我是否可以使用MaxDegreeOfParallelism更改一些设置,这些设置可能会同时运行较少的操作,并且不会导致超时?

我可以增加超时,但这不是一个好的解决方案,如果在较低的CPU上,我可以同时处理较少的操作,这将减少CPU的负载。

好的,我也读过所有其他的帖子和MSDN,但将MaxDegreeOfParallelism设置为较低的值会让我的四核机器受到影响吗?

例如,有没有办法做这样的事情,如果CPU有两个核心,那么使用20,如果CPU只有四个核心,然后使用40?

What does MaxDegreeOfParallelism do?

答案是,无论内核数量如何,它都是整个并行操作的上限。

因此,即使您因为等待IO或锁而不使用CPU,也不会并行运行任何额外的任务,只有您指定的最大任务。

为了找到答案,我写了一段测试代码。那里有一个人工锁,可以刺激TPL使用更多的线程。当您的代码正在等待IO或数据库时,也会发生同样的情况。

class Program
{
    static void Main(string[] args)
    {
        var locker = new Object();
        int count = 0;
        Parallel.For
            (0
             , 1000
             , new ParallelOptions { MaxDegreeOfParallelism = 2 }
             , (i) =>
                   {
                       Interlocked.Increment(ref count);
                       lock (locker)
                       {
                           Console.WriteLine("Number of active threads:" + count);
                           Thread.Sleep(10);
                        }
                        Interlocked.Decrement(ref count);
                    }
            );
    }
}

如果我没有指定MaxDegreeOfParallelism,控制台日志显示最多有8个任务同时运行。像这样:

Number of active threads:6
Number of active threads:7
Number of active threads:7
Number of active threads:7
Number of active threads:7
Number of active threads:7
Number of active threads:6
Number of active threads:7
Number of active threads:7
Number of active threads:7
Number of active threads:7
Number of active threads:7
Number of active threads:7
Number of active threads:7
Number of active threads:7
Number of active threads:7
Number of active threads:7
Number of active threads:7
Number of active threads:7

它开始时更低,随着时间的推移而增加,最后它试图同时运行8。

如果我把它限制在某个任意值(比如2),我就会得到

Number of active threads:2
Number of active threads:1
Number of active threads:2
Number of active threads:2
Number of active threads:2
Number of active threads:2
Number of active threads:2
Number of active threads:2
Number of active threads:2
Number of active threads:2
Number of active threads:2
Number of active threads:2
Number of active threads:2
Number of active threads:2
Number of active threads:2
Number of active threads:2
Number of active threads:2

哦,这是在一台四核机器上。

例如,有没有办法做这样的事情,如果CPU有两个核心,那么使用20,如果CPU只有四个核心,然后使用40?

这样做可以使并行性取决于CPU核心的数量:

var options = new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount * 10 };
Parallel.ForEach(sourceCollection, options, sourceItem =>
{
    // do something
});

然而,较新的CPU倾向于使用超线程来模拟额外的内核。因此,如果您有一个四核处理器,那么Environment.ProcessorCount可能会将其报告为8核。我发现,如果将并行度设置为考虑模拟内核,那么它实际上会减慢其他线程(如UI线程)的速度。

因此,尽管操作会更快地完成,但在此期间,应用程序UI可能会经历显著的滞后。将"Environment.ProcessorCount"除以2似乎可以获得相同的处理速度,同时仍然保持CPU可用于UI线程。

听起来你并行运行的代码是死锁的,这意味着除非你能找到并解决导致死锁的问题,否则你根本不应该并行化它。

Parallel.ForEach方法在内部启动多个Task,这些任务中的每一个都重复地从source序列中获取一个项,并为此项调用body委托。MaxDegreeOfParallelism可以设置这些内部任务的上限。但这种设置并不是限制并行性的唯一因素。还存在CCD_ 11执行由CCD_。

生成机制通过每个生成的任务复制自身来工作。按照顺序,每个任务要做的第一件事就是创建另一个任务。大多数TaskScheduler都有并发执行任务数量的限制,当达到这个限制时,它们会对下一个传入任务进行排队,而不会立即执行。因此,Parallel.ForEach的自复制模式最终将停止生成更多的任务,因为生成的最后一个任务将闲置在TaskScheduler的队列中。

让我们讨论一下TaskScheduler.Default,它是Parallel.ForEach的默认调度器,并在ThreadPool上调度任务。ThreadPool具有软极限和硬极限。软限制是指工作需求没有立即得到满足,而硬限制是指在已经运行的工作项完成之前,工作需求从未得到满足。当ThreadPool达到软限制(默认为Environment.ProcessorCount)时,它会以每秒一个新线程的频率产生更多线程来满足需求。软限制可以使用ThreadPool.SetMinThreads方法进行配置。硬极限可以用ThreadPool.GetMaxThreads方法找到,在我的机器中是32767个线程。

因此,如果我用MaxDegreeOfParallelism = 20配置我的4核机器中的Parallel.ForEach,并且body委托使当前线程忙碌一秒钟以上,则有效并行度将从5开始,然后在接下来的15秒内逐渐增加,直到它变为20,并且它将保持在20,直到循环完成。它以5而不是4开头的原因是Parallel.ForEach还使用当前线程以及ThreadPool

如果我不配置MaxDegreeOfParallelism,它将与使用值-1配置它相同,这意味着无限制的并行性。在这种情况下,ThreadPool可用性将是实际并行度的唯一限制因素。只要Parallel.ForEach运行,ThreadPool就会饱和,换句话说,它将处于供应不断被需求超越的情况。每当ThreadPool派生出一个新线程时,该线程将选择Parallel.ForEach之前调度的最后一个任务,该任务将立即复制自身,并且副本将进入ThreadPool的队列。假设Parallel.ForEach将运行足够长的时间,ThreadPool将达到其最大大小(在我的机器中为32767),并将保持在这个级别,直到循环完成。这是假设进程不会因为缺少内存等其他资源而崩溃。

CCD_ 39财产的官方文件指出;通常,您不需要修改此设置"。显然,自从引入带有.NET Framework 4.0的TPL(2010年)以来,就一直是这样。在这一点上,你可能已经开始质疑这个建议的有效性。我也是,所以我在dotnet/运行库上发布了一个问题,询问给定的建议是否仍然有效或过时。我很惊讶地收到反馈,说这个建议和以前一样有效。Microsoft的论点是,将MaxDegreeOfParallelism限制为值Environment.ProcessorCount可能会导致性能回归,甚至在某些情况下会导致死锁。我以几个例子作为回应,展示了当未配置的Parallel.ForEach在启用异步的应用程序中运行时可能出现的问题行为,其中其他事情与并行循环同时发生。这些演示被认为是不具代表性的,因为我使用了Thread.Sleep方法来模拟循环中的工作。

我个人的建议是:无论何时使用任何Parallel方法,都要始终指定MaxDegreeOfParallelism,即使您对默认值感到满意。如果你相信我关于使ThreadPool饱和的不可取性的论点,你可以用一个合适的值来配置它,比如Environment.ProcessorCount。如果你相信微软关于性能和死锁的论点,你可以用-1来配置它。在任何一种情况下,任何看到你的代码的人都会被暗示你做出了有意识和知情的决定。

ccoThreadPool的注入速率未记录在案。";每秒一个新线程"是一个实验观察结果

需要考虑的另一件事是,根据您的情况,通常最好收集DataTable中的所有数据,然后在每个主要任务结束时使用SqlBulkCopy,尤其是对于那些多年后发现这种情况的人。

例如,我有一个流程,它运行了数百万个文件,当每个文件事务都进行DB查询以插入记录时,我也遇到了同样的错误。相反,我将其全部存储在内存中的DataTable中,用于迭代的每个共享,将DataTable转储到SQL Server中,并在每个单独的共享之间清除它。大容量插入只需一瞬间,而且不会同时打开数千个连接。

编辑:这里有一个快速&肮脏的工作实例SQLBulkCopy方法:

private static void updateDatabase(DataTable targetTable)
    {
        try
        {
            DataSet ds = new DataSet("FileFolderAttribute");
            ds.Tables.Add(targetTable);
            writeToLog(targetTable.TableName + " - Rows: " + targetTable.Rows.Count, logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
            writeToLog(@"Opening SQL connection", logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
            Console.WriteLine(@"Opening SQL connection");
            SqlConnection sqlConnection = new SqlConnection(sqlConnectionString);
            sqlConnection.Open();
            SqlBulkCopy bulkCopy = new SqlBulkCopy(sqlConnection, SqlBulkCopyOptions.TableLock | SqlBulkCopyOptions.FireTriggers | SqlBulkCopyOptions.UseInternalTransaction, null);
            bulkCopy.DestinationTableName = "FileFolderAttribute";
            writeToLog(@"Copying data to SQL Server table", logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
            Console.WriteLine(@"Copying data to SQL Server table");
            foreach (var table in ds.Tables)
            {
                writeToLog(table.ToString(), logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
                Console.WriteLine(table.ToString());
            }
            bulkCopy.WriteToServer(ds.Tables[0]);
            sqlConnection.Close();
            sqlConnection.Dispose();
            writeToLog(@"Closing SQL connection", logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
            writeToLog(@"Clearing local DataTable...", logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
            Console.WriteLine(@"Closing SQL connection");
            Console.WriteLine(@"Clearing local DataTable...");
            targetTable.Clear();
            ds.Tables.Remove(targetTable);
            ds.Clear();
            ds.Dispose();
        }
        catch (Exception error)
        {
            errorLogging(error, getCurrentMethod(), logDatabaseFile);
        }
    }

并将其转储到数据表中:

private static void writeToDataTable(string ServerHostname, string RootDirectory, string RecordType, string Path, string PathDirectory, string PathFileName, string PathFileExtension, decimal SizeBytes, decimal SizeMB, DateTime DateCreated, DateTime DateModified, DateTime DateLastAccessed, string Owner, int PathLength, DateTime RecordWriteDateTime)
    {
        try
        {
            if (tableToggle)
            {
                DataRow toInsert = results_1.NewRow();
                toInsert[0] = ServerHostname;
                toInsert[1] = RootDirectory;
                toInsert[2] = RecordType;
                toInsert[3] = Path;
                toInsert[4] = PathDirectory;
                toInsert[5] = PathFileName;
                toInsert[6] = PathFileExtension;
                toInsert[7] = SizeBytes;
                toInsert[8] = SizeMB;
                toInsert[9] = DateCreated;
                toInsert[10] = DateModified;
                toInsert[11] = DateLastAccessed;
                toInsert[12] = Owner;
                toInsert[13] = PathLength;
                toInsert[14] = RecordWriteDateTime;
                results_1.Rows.Add(toInsert);
            }
            else
            {
                DataRow toInsert = results_2.NewRow();
                toInsert[0] = ServerHostname;
                toInsert[1] = RootDirectory;
                toInsert[2] = RecordType;
                toInsert[3] = Path;
                toInsert[4] = PathDirectory;
                toInsert[5] = PathFileName;
                toInsert[6] = PathFileExtension;
                toInsert[7] = SizeBytes;
                toInsert[8] = SizeMB;
                toInsert[9] = DateCreated;
                toInsert[10] = DateModified;
                toInsert[11] = DateLastAccessed;
                toInsert[12] = Owner;
                toInsert[13] = PathLength;
                toInsert[14] = RecordWriteDateTime;
                results_2.Rows.Add(toInsert);
            }

        }
        catch (Exception error)
        {
            errorLogging(error, getCurrentMethod(), logFile);
        }
    }

这是上下文,循环片段本身:

private static void processTargetDirectory(DirectoryInfo rootDirectory, string targetPathRoot)
    {
        DateTime StartTime = DateTime.Now;
        int directoryCount = 0;
        int fileCount = 0;
        try
        {                
            manageDataTables();
            Console.WriteLine(rootDirectory.FullName);
            writeToLog(@"Working in Directory: " + rootDirectory.FullName, logFile, getLineNumber(), getCurrentMethod(), true);
            applicationsDirectoryCount++;
            // REPORT DIRECTORY INFO //
            string directoryOwner = "";
            try
            {
                directoryOwner = File.GetAccessControl(rootDirectory.FullName).GetOwner(typeof(System.Security.Principal.NTAccount)).ToString();
            }
            catch (Exception error)
            {
                //writeToLog("'t" + rootDirectory.FullName, logExceptionsFile, getLineNumber(), getCurrentMethod(), true);
                writeToLog("[" + error.Message + "] - " + rootDirectory.FullName, logExceptionsFile, getLineNumber(), getCurrentMethod(), true);
                errorLogging(error, getCurrentMethod(), logFile);
                directoryOwner = "SeparatedUser";
            }
            writeToRawLog(serverHostname + "," + targetPathRoot + "," + "Directory" + "," + rootDirectory.Name + "," + rootDirectory.Extension + "," + 0 + "," + 0 + "," + rootDirectory.CreationTime + "," + rootDirectory.LastWriteTime + "," + rootDirectory.LastAccessTime + "," + directoryOwner + "," + rootDirectory.FullName.Length + "," + DateTime.Now + "," + rootDirectory.FullName + "," + "", logResultsFile, true, logFile);
            //writeToDBLog(serverHostname, targetPathRoot, "Directory", rootDirectory.FullName, "", rootDirectory.Name, rootDirectory.Extension, 0, 0, rootDirectory.CreationTime, rootDirectory.LastWriteTime, rootDirectory.LastAccessTime, directoryOwner, rootDirectory.FullName.Length, DateTime.Now);
            writeToDataTable(serverHostname, targetPathRoot, "Directory", rootDirectory.FullName, "", rootDirectory.Name, rootDirectory.Extension, 0, 0, rootDirectory.CreationTime, rootDirectory.LastWriteTime, rootDirectory.LastAccessTime, directoryOwner, rootDirectory.FullName.Length, DateTime.Now);
            if (rootDirectory.GetDirectories().Length > 0)
            {
                Parallel.ForEach(rootDirectory.GetDirectories(), new ParallelOptions { MaxDegreeOfParallelism = directoryDegreeOfParallelism }, dir =>
                {
                    directoryCount++;
                    Interlocked.Increment(ref threadCount);
                    processTargetDirectory(dir, targetPathRoot);
                });
            }
            // REPORT FILE INFO //
            Parallel.ForEach(rootDirectory.GetFiles(), new ParallelOptions { MaxDegreeOfParallelism = fileDegreeOfParallelism }, file =>
            {
                applicationsFileCount++;
                fileCount++;
                Interlocked.Increment(ref threadCount);
                processTargetFile(file, targetPathRoot);
            });
        }
        catch (Exception error)
        {
            writeToLog(error.Message, logExceptionsFile, getLineNumber(), getCurrentMethod(), true);
            errorLogging(error, getCurrentMethod(), logFile);
        }
        finally
        {
            Interlocked.Decrement(ref threadCount);
        }
        DateTime EndTime = DateTime.Now;
        writeToLog(@"Run time for " + rootDirectory.FullName + @" is: " + (EndTime - StartTime).ToString() + @" | File Count: " + fileCount + @", Directory Count: " + directoryCount, logTimingFile, getLineNumber(), getCurrentMethod(), true);
    }

就像上面提到的,这是快速的&脏,但效果很好。

对于我在获得大约2000000条记录后遇到的与内存相关的问题,我必须创建第二个DataTable并在2条记录之间交替,在交替之间将记录转储到SQL server。因此,我的SQL连接由每100000条记录中的1条组成。

我是这样处理的:

private static void manageDataTables()
    {
        try
        {
            Console.WriteLine(@"[Checking datatable size] toggleValue: " + tableToggle + " | " + @"r1: " + results_1.Rows.Count + " - " + @"r2: " + results_2.Rows.Count);
            if (tableToggle)
            {
                int rowCount = 0;
                if (results_1.Rows.Count > datatableRecordCountThreshhold)
                {
                    tableToggle ^= true;
                    writeToLog(@"results_1 row count > 100000 @ " + results_1.Rows.Count, logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
                    rowCount = results_1.Rows.Count;
                    logResultsFile = "FileServerReport_Results_" + DateTime.Now.ToString("yyyyMMdd-HHmmss") + ".txt";
                    Thread.Sleep(5000);
                    if (results_1.Rows.Count != rowCount)
                    {
                        writeToLog(@"results_1 row count increased, @ " + results_1.Rows.Count, logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
                        rowCount = results_1.Rows.Count;
                        Thread.Sleep(15000);
                    }
                    writeToLog(@"results_1 row count stopped increasing, updating database...", logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
                    updateDatabase(results_1);
                    results_1.Clear();
                    writeToLog(@"results_1 cleared, count: " + results_1.Rows.Count, logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
                }
            }
            else
            {
                int rowCount = 0;
                if (results_2.Rows.Count > datatableRecordCountThreshhold)
                {
                    tableToggle ^= true;
                    writeToLog(@"results_2 row count > 100000 @ " + results_2.Rows.Count, logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
                    rowCount = results_2.Rows.Count;
                    logResultsFile = "FileServerReport_Results_" + DateTime.Now.ToString("yyyyMMdd-HHmmss") + ".txt";
                    Thread.Sleep(5000);
                    if (results_2.Rows.Count != rowCount)
                    {
                        writeToLog(@"results_2 row count increased, @ " + results_2.Rows.Count, logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
                        rowCount = results_2.Rows.Count;
                        Thread.Sleep(15000);
                    }
                    writeToLog(@"results_2 row count stopped increasing, updating database...", logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
                    updateDatabase(results_2);
                    results_2.Clear();
                    writeToLog(@"results_2 cleared, count: " + results_2.Rows.Count, logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
                }
            }
        }
        catch (Exception error)
        {
            errorLogging(error, getCurrentMethod(), logDatabaseFile);
        }
    }

其中"datatableRecordCountThresholdhold=100000"

它设置并行运行的线程数。。。