PLINQ:如何在超过4个线程上运行并行查询

本文关键字:线程 运行 并行 查询 4个 PLINQ | 更新日期: 2023-09-27 18:03:46

更新-更改问题的标题以反映我真正想要的

考虑下面这段代码:
// this query generates 12 instances of Func<int>, which each when executed
// print something to the console and wait for 1 second.
var actions = Enumerable.Range(0, 12).Select(i => new Func<int>(() =>
{
    Console.WriteLine("{0} - waiting 1 sec", i);
    Thread.Sleep(1000);
    return 1;
}));
// define a parallel query. Note the WithDegreeOfParallelism call here.
var query = from action in actions.AsParallel().WithDegreeOfParallelism(12)
            select action();
// execute, measuring total duration
var stopw = Stopwatch.StartNew();
query.ToList();
Console.WriteLine(stopw.Elapsed);
Console.WriteLine(Environment.ProcessorCount); // 3 on my machine

当省略对WithDegreeOfParallelism的调用时,它分4个块执行,总共花费大约4秒,这是我所期望的,因为我的CPU计数是3。

然而,当使用4以上的任何号码调用WithDegreeOfParallelism时,我总是得到3个块,并且总持续时间不低于3秒。我希望值为12的总持续时间为(略多于)1秒。

我错过了什么?我如何强制并行执行超过4个非cpu密集型任务,这就是我所追求的?

更新:我当然可以回到手动旋转线程,但我希望新的PFX库会使这更容易一点…无论如何,下面的代码给了我大约1秒的总执行时间

List<Thread> threads = new List<Thread>();
for (int i = 0; i < 12; i++)
{
    int i1 = i;
    threads.Add(new Thread(() =>
    {
        Console.WriteLine(i1);
        Thread.Sleep(1000);
    }));
}
threads.ForEach(t => t.Start());
threads.ForEach(t => t.Join());

PLINQ:如何在超过4个线程上运行并行查询

尝试在并行循环中使用TaskCreationOptions.LongRunning选项启动新任务。它们将立即启动,而不是等待线程池上的线程可用。

正如我所说,WithDegreeOfParallelism只设置了一个上界。试着把你的任务从10个增加到100个。你最终会有大约10秒的时间来完成所有100个任务。您的代码适用于具有较小操作的大量任务。并在任务中添加Console.WriteLine("{0} threads " ,Process.GetCurrentProcess().Threads.Count);,然后可以看到创建了多少线程。(线程数不是plinq创建的线程数。

在PLinq中有很多方法可以实现并行。阅读这篇文章http://msdn.microsoft.com/en-us/library/dd997411.aspx。您需要根据相关需求选择最佳方式,以获得更好的性能。

WithDegreeOfParallelism指示PLINQ应该创建多少个任务,但不一定要使用多少个线程。

由于任务作为工作项在ThreadPool上执行,因此执行查询的线程数量将受到ThreadPool大小的限制。ThreadPool将根据需要添加线程,但这可能需要一段时间- ThreadPool可能每秒添加2个线程左右。

如果你想快速添加线程到ThreadPool,你可以使用SetMinThreads方法。如果将这段代码放在代码的开头,测试应该在一秒钟左右完成:

int prevThreads, prevPorts;
ThreadPool.GetMinThreads(out prevThreads, out prevPorts);
ThreadPool.SetMinThreads(12, prevPorts);

你可以决定你需要多少线程,然后使用SetMinThreads和SetMaxThreads来设置线程池大小的界限