确定任务工厂生成作业的线程数的因素

本文关键字:线程 作业 任务 工厂 | 更新日期: 2023-09-27 18:31:17

我有以下代码:

var factory = new TaskFactory();
for (int i = 0; i < 100; i++)
{
    var i1 = i;
    factory.StartNew(() => foo(i1));
}
static void foo(int i)
{
    Thread.Sleep(1000);
    Console.WriteLine($"foo{i} - on thread {Thread.CurrentThread.ManagedThreadId}");
}            

我可以看到它一次只做 4 个线程(基于观察)。 我的问题:

  1. 什么决定了一次使用的线程数?
  2. 如何检索此号码?
  3. 如何更改此号码?

附言我的盒子有 4 个核心。

附言 我需要有特定数量的任务(仅此而已)由 TPL 并发处理,最终得到以下代码:

private static int count = 0;   // keep track of how many concurrent tasks are running
private static void SemaphoreImplementation()
{
    var s = new Semaphore(20, 20);  // allow 20 tasks at a time
    for (int i = 0; i < 1000; i++)
    {
        var i1 = i;
        Task.Factory.StartNew(() =>
        {
            try
            {                        
                s.WaitOne();
                Interlocked.Increment(ref count);
                foo(i1);
            }
            finally
            {
                s.Release();
                Interlocked.Decrement(ref count);
            }
        }, TaskCreationOptions.LongRunning);
    }
}
static void foo(int i)
{
    Thread.Sleep(100);
    Console.WriteLine($"foo{i:00} - on thread " + 
            $"{Thread.CurrentThread.ManagedThreadId:00}. Executing concurently: {count}");
}

确定任务工厂生成作业的线程数的因素

当您在 .NET 中使用Task时,您告诉 TPL 计划要在ThreadPool上执行的工作(通过 TaskScheduler )。请注意,工作将尽早安排,并且计划程序认为合适。这意味着TaskScheduler将决定将使用多少个线程来运行n数量的任务,以及在哪个线程上执行哪个任务。

TPL 经过了很好的调整,并在执行任务时继续调整其算法。因此,在大多数情况下,它会尝试最大程度地减少争用。这意味着如果您正在运行 100 个任务并且只有 4 个内核(您可以使用 Environment.ProcessorCount 获得),则在任何给定时间执行超过 4 个线程是没有意义的,否则它将需要做更多的上下文切换。现在,有时您希望显式覆盖此行为。假设您需要等待某种IO完成,这是一个完全不同的故事

总之,信任TPL。但是,如果您坚持为每个任务生成一个线程(并不总是一个好主意!),您可以使用:

Task.Factory.StartNew(
    () => /* your piece of work */, 
    TaskCreationOptions.LongRunning);

这会告诉默认Taskscheduler为该工作显式生成一个新线程。

您也可以使用自己的Scheduler并将其传递给TaskFactory。你可以找到一大堆Schedulers HERE.

请注意,另一种选择是使用 PLINQ 默认情况下,它再次分析您的查询并决定并行化它是否会产生任何好处,同样在阻塞 IO 的情况下,您确定启动多个线程将导致更好的执行,您可以使用WithExecutionMode(ParallelExecutionMode.ForceParallelism)然后可以使用 WithDegreeOfParallelism, 给出有关使用多少线程的提示,但请记住,不能保证您会获得那么多线程如 MSDN 所说:

设置要在查询中使用的并行度。程度 并行度是并发执行任务的最大数量 将用于处理查询。

最后,我强烈建议阅读THIS关于ThreadingTPL的系列文章。

如果将

任务数增加到例如 1000000,您将看到随着时间的推移生成更多的线程。TPL 倾向于每 500 毫秒注入一次。

TPL 线程池不理解 IO 绑定工作负载(睡眠是 IO)。在这些情况下,依靠 TPL 来选择正确的并行度不是一个好主意。TPL 完全没有头绪,并且根据对吞吐量的模糊猜测注入更多线程。也是为了避免死锁。

在这里,TPL 策略显然没有用,因为您添加的线程越多,获得的吞吐量就越多。在这种人为的情况下,每个线程每秒可以处理一个项目。TPL对此一无所知。将线程计数限制为内核数是没有意义的。

什么决定了一次使用的线程数?

几乎没有记录的 TPL 启发式方法。他们经常出错。特别是在这种情况下,它们会随着时间的推移生成无限数量的线程。使用任务管理器亲自查看。让它运行一个小时,您将拥有 1000 个线程。

如何检索此号码?如何更改此号码?

您可以检索其中一些数字,但这不是正确的方法。如果您需要有保证的 DOP,您可以使用AsParallel().WithDegreeOfParallelism(...)或自定义任务计划程序。您也可以手动启动LongRunning任务。不要弄乱进程全局设置。

我建议使用 SemaphoreSlim,因为它不使用 Windows 内核(因此它可以在 Linux C# 微服务中使用),并且还有一个属性SemaphoreSlim.CurrentCount,可以告诉还剩下多少线程,因此您不需要Interlocked.IncrementInterlocked.Decrement。我还删除了i1 i因为它是值类型,并且不会通过调用传递 i 参数的方法foo更改它,因此无需将其复制到 i1 中以确保它永远不会更改(如果这是添加i1的原因):

private static void SemaphoreImplementation()
{
    var maxThreadsCount = 20; // allow 20 tasks at a time
    var semaphoreSlim = new SemaphoreSlim(maxTasksCount, maxTasksCount);
    var taskFactory = new TaskFactory();
    
    for (int i = 0; i < 1000; i++)
    {
        taskFactory.StartNew(async () =>
        {
            try
            {                        
                await semaphoreSlim.WaitAsync();
                var count = maxTasksCount-semaphoreSlim.CurrentCount; //SemaphoreSlim.CurrentCount tells how many threads are remaining
                await foo(i, count);
            }
            finally
            {
                semaphoreSlim.Release();
            }
        }, TaskCreationOptions.LongRunning);
    }
}
static async void foo(int i, int count)
{
    await Task.Wait(100);
    Console.WriteLine($"foo{i:00} - on thread " + 
            $"{Thread.CurrentThread.ManagedThreadId:00}. Executing concurently: {count}");
}