是否有可能强制TPL总是在不同的Parallel.For中使用相同的线程索引?

本文关键字:For 索引 线程 Parallel 有可能 TPL 是否 | 更新日期: 2023-09-27 18:04:16

我有一个使用Parallel.For的内存问题。

是否有可能强制TPL在不同的Parallel.For中始终使用相同的线程索引?

即使我在所有不同的并行中使用MaxDegreeOfParallelism = Environment.ProcessorCount。For, TPL在两个连续的Parallel.For之间并不完全使用相同的线程池。我有记忆问题,因为这个问题。

我不明白为什么如果我的Environment.ProcessorCount = 2, TPL不在开始时只创建2个线程,并将其用于所有连续的Parallel.For。这是我的目标。

我怎么能做到呢?

是否有可能强制TPL总是在不同的Parallel.For中使用相同的线程索引?

不,不可能。

如果你想手动控制线程,TPL不是正确的抽象。使用系统。线程直接。

这是可行的,但您需要使用自定义TaskScheduler配置并行循环,这很棘手。这个TaskScheduler实际上是一个自定义线程池,具有固定数量的线程,与内置的ThreadPool完全无关。启动和终止这些线程的责任将完全取决于您。下面是这样一个池的最小实现:

public class CustomThreadPool : TaskScheduler, IDisposable
{
    private readonly BlockingCollection<Task> _queue;
    private readonly Thread[] _threads;
    public CustomThreadPool(int threadsCount)
    {
        _queue = new BlockingCollection<Task>();
        _threads = Enumerable.Range(0, threadsCount).Select(_ => new Thread(() =>
        {
            foreach (var task in _queue.GetConsumingEnumerable())
                TryExecuteTask(task);
        })).ToArray();
        Array.ForEach(_threads, t => t.IsBackground = true);
        Array.ForEach(_threads, t => t.Start());
    }
    protected override void QueueTask(Task task) => _queue.Add(task);
    protected override bool TryExecuteTaskInline(Task task,
        bool taskWasPreviouslyQueued)
    {
        if (Array.IndexOf(_threads, Thread.CurrentThread) < 0) return false;
        return TryExecuteTask(task);
    }
    public override int MaximumConcurrencyLevel => _threads.Length;
    protected override IEnumerable<Task> GetScheduledTasks() => _queue;
    public void Dispose()
    {
        _queue.CompleteAdding();
        Array.ForEach(_threads, t => t.Join());
        _queue.Dispose();
    }
}

你可以像这样使用Parallel.For循环:

using CustomThreadPool customThreadPool = new(threadsCount: 2);
ParallelOptions options = new()
{
    TaskScheduler = customThreadPool,
    MaxDegreeOfParallelism = customThreadPool.MaximumConcurrencyLevel,
};
Parallel.For(1, 10, options, i =>
{
    Console.WriteLine($"i: {i}, Thread #{Thread.CurrentThread.ManagedThreadId}");
    Thread.Sleep(100);
});
样本输出:

i: 5, Thread #4
i: 1, Thread #5
i: 2, Thread #5
i: 6, Thread #4
i: 3, Thread #5
i: 7, Thread #4
i: 4, Thread #5
i: 8, Thread #4
i: 9, Thread #5

在线演示。

只要你使用相同的CustomThreadPool实例,你所有的并行循环将在相同的线程#4和#5上运行。