确定任务工厂生成作业的线程数的因素
本文关键字:线程 作业 任务 工厂 | 更新日期: 2023-09-27 18:31:17
我有以下代码:
var factory = new TaskFactory();
for (int i = 0; i < 100; i++)
{
var i1 = i;
factory.StartNew(() => foo(i1));
}
static void foo(int i)
{
Thread.Sleep(1000);
Console.WriteLine($"foo{i} - on thread {Thread.CurrentThread.ManagedThreadId}");
}
我可以看到它一次只做 4 个线程(基于观察)。 我的问题:
- 什么决定了一次使用的线程数?
- 如何检索此号码?
- 如何更改此号码?
附言我的盒子有 4 个核心。
附言 我需要有特定数量的任务(仅此而已)由 TPL 并发处理,最终得到以下代码:
private static int count = 0; // keep track of how many concurrent tasks are running
private static void SemaphoreImplementation()
{
var s = new Semaphore(20, 20); // allow 20 tasks at a time
for (int i = 0; i < 1000; i++)
{
var i1 = i;
Task.Factory.StartNew(() =>
{
try
{
s.WaitOne();
Interlocked.Increment(ref count);
foo(i1);
}
finally
{
s.Release();
Interlocked.Decrement(ref count);
}
}, TaskCreationOptions.LongRunning);
}
}
static void foo(int i)
{
Thread.Sleep(100);
Console.WriteLine($"foo{i:00} - on thread " +
$"{Thread.CurrentThread.ManagedThreadId:00}. Executing concurently: {count}");
}
当您在 .NET 中使用Task
时,您告诉 TPL 计划要在ThreadPool
上执行的工作(通过 TaskScheduler
)。请注意,工作将尽早安排,并且计划程序认为合适。这意味着TaskScheduler
将决定将使用多少个线程来运行n
数量的任务,以及在哪个线程上执行哪个任务。
TPL 经过了很好的调整,并在执行任务时继续调整其算法。因此,在大多数情况下,它会尝试最大程度地减少争用。这意味着如果您正在运行 100 个任务并且只有 4 个内核(您可以使用 Environment.ProcessorCount
获得),则在任何给定时间执行超过 4 个线程是没有意义的,否则它将需要做更多的上下文切换。现在,有时您希望显式覆盖此行为。假设您需要等待某种IO完成,这是一个完全不同的故事。
总之,信任TPL。但是,如果您坚持为每个任务生成一个线程(并不总是一个好主意!),您可以使用:
Task.Factory.StartNew(
() => /* your piece of work */,
TaskCreationOptions.LongRunning);
这会告诉默认Taskscheduler
为该工作显式生成一个新线程。
您也可以使用自己的Scheduler
并将其传递给TaskFactory
。你可以找到一大堆Schedulers
HERE
.
请注意,另一种选择是使用 PLINQ
默认情况下,它再次分析您的查询并决定并行化它是否会产生任何好处,同样在阻塞 IO 的情况下,您确定启动多个线程将导致更好的执行,您可以使用WithExecutionMode(ParallelExecutionMode.ForceParallelism)
然后可以使用 WithDegreeOfParallelism, 给出有关使用多少线程的提示,但请记住,不能保证您会获得那么多线程,如 MSDN 所说:
设置要在查询中使用的并行度。程度 并行度是并发执行任务的最大数量 将用于处理查询。
最后,我强烈建议阅读THIS
关于Threading
和TPL
的系列文章。
任务数增加到例如 1000000,您将看到随着时间的推移生成更多的线程。TPL 倾向于每 500 毫秒注入一次。
TPL 线程池不理解 IO 绑定工作负载(睡眠是 IO)。在这些情况下,依靠 TPL 来选择正确的并行度不是一个好主意。TPL 完全没有头绪,并且根据对吞吐量的模糊猜测注入更多线程。也是为了避免死锁。
在这里,TPL 策略显然没有用,因为您添加的线程越多,获得的吞吐量就越多。在这种人为的情况下,每个线程每秒可以处理一个项目。TPL对此一无所知。将线程计数限制为内核数是没有意义的。
什么决定了一次使用的线程数?
几乎没有记录的 TPL 启发式方法。他们经常出错。特别是在这种情况下,它们会随着时间的推移生成无限数量的线程。使用任务管理器亲自查看。让它运行一个小时,您将拥有 1000 个线程。
如何检索此号码?如何更改此号码?
您可以检索其中一些数字,但这不是正确的方法。如果您需要有保证的 DOP,您可以使用AsParallel().WithDegreeOfParallelism(...)
或自定义任务计划程序。您也可以手动启动LongRunning
任务。不要弄乱进程全局设置。
我建议使用 SemaphoreSlim
,因为它不使用 Windows 内核(因此它可以在 Linux C# 微服务中使用),并且还有一个属性SemaphoreSlim.CurrentCount
,可以告诉还剩下多少线程,因此您不需要Interlocked.Increment
或Interlocked.Decrement
。我还删除了i1
i
因为它是值类型,并且不会通过调用传递 i
参数的方法foo
更改它,因此无需将其复制到 i1
中以确保它永远不会更改(如果这是添加i1
的原因):
private static void SemaphoreImplementation()
{
var maxThreadsCount = 20; // allow 20 tasks at a time
var semaphoreSlim = new SemaphoreSlim(maxTasksCount, maxTasksCount);
var taskFactory = new TaskFactory();
for (int i = 0; i < 1000; i++)
{
taskFactory.StartNew(async () =>
{
try
{
await semaphoreSlim.WaitAsync();
var count = maxTasksCount-semaphoreSlim.CurrentCount; //SemaphoreSlim.CurrentCount tells how many threads are remaining
await foo(i, count);
}
finally
{
semaphoreSlim.Release();
}
}, TaskCreationOptions.LongRunning);
}
}
static async void foo(int i, int count)
{
await Task.Wait(100);
Console.WriteLine($"foo{i:00} - on thread " +
$"{Thread.CurrentThread.ManagedThreadId:00}. Executing concurently: {count}");
}