有限并发的TaskScheduler,可以交错要显式排序的任务

本文关键字:排序 任务 并发 TaskScheduler | 更新日期: 2023-09-27 18:28:57

我正在寻找一个TaskScheduler,它:

  1. 允许我定义多个专用线程(例如8个)-标准的LimitedConcurrencyLevelTaskScheduler(使用线程池线程)或WorkStealingTaskScheduler可以这样做
  2. 允许我创建完全有序的子任务调度器,但将任务安排在父调度器的专用线程上

目前,我们将TaskScheduler.Default用于一般池(取决于线程池增长算法等),每当我们想订购任务时,都使用new OrderedTaskScheduler()。我想保持这种行为,但将这两个要求都限制在我自己的专用线程池中。

QueuedTaskScheduler似乎非常接近。我认为返回子TaskScheduler的QueuedTaskScheduler.ActivateNewQueue()方法会在父级的工作线程池上按顺序执行任务,但事实并非如此。子TaskScheduler似乎具有与父TaskScheduler相同的并行化级别。

我不一定希望子任务调度器任务优先于父任务调度器任务(尽管这在未来可能是一个不错的功能)。

我在这里看到了一个相关的问题:有限并发级别的任务调度器(具有任务优先级)处理封装的任务,但我的需求不需要处理异步任务(我所有排队的任务从头到尾都是完全同步的,没有继续)。

有限并发的TaskScheduler,可以交错要显式排序的任务

我认为"完全有序"的意思也是"一次一个"。

在这种情况下,我相信有一个内置的解决方案应该做得很好:ConcurrentExclusiveSchedulerPair

您的"父"调度程序将是一个并发调度程序:

TaskScheduler _parent = new ConcurrentExclusiveSchedulerPair(TaskScheduler.Default, 8)
     .ConcurrentScheduler;

"子"调度器将是一个独占调度器,它使用下面的并发调度器:

var myScheduler = new ConcurrentExclusiveSchedulerPair(_parent).ExclusiveScheduler;

在仔细考虑了其他答案后,我决定为自己的用途创建一个自定义QueuedTaskScheduler更容易,因为我不需要担心异步任务或IO完成(尽管这给了我一些思考)。

首先,当我们从子工作池中获取工作时,我们在FindNextTask_NeedsLock:中添加了一个基于信号量的锁

var items = queueForTargetTask._workItems;
if (items.Count > 0 
    && queueForTargetTask.TryLock() /* This is added */)
{
    targetTask = items.Dequeue();

对于专用线程版本,在ThreadBasedDispatchLoop:内部

// ... and if we found one, run it
if (targetTask != null)
{
    queueForTargetTask.ExecuteTask(targetTask);
    queueForTargetTask.Release();
}

对于任务调度程序版本,在ProcessPrioritizedAndBatchedTasks:内部

// Now if we finally have a task, run it.  If the task
// was associated with one of the round-robin schedulers, we need to use it
// as a thunk to execute its task.
if (targetTask != null)
{
    if (queueForTargetTask != null)
    {
        queueForTargetTask.ExecuteTask(targetTask);
        queueForTargetTask.Release();
    }
    else
    {
        TryExecuteTask(targetTask);
    }
}

我们在哪里创建新的子队列:

/// <summary>Creates and activates a new scheduling queue for this scheduler.</summary>
/// <returns>The newly created and activated queue at priority 0 and max concurrency of 1.</returns>
public TaskScheduler ActivateNewQueue() { return ActivateNewQueue(0, 1); }
/// <summary>Creates and activates a new scheduling queue for this scheduler.</summary>
/// <param name="priority">The priority level for the new queue.</param>
/// <returns>The newly created and activated queue at the specified priority.</returns>
public TaskScheduler ActivateNewQueue(int priority, int maxConcurrency)
{
    // Create the queue
    var createdQueue = new QueuedTaskSchedulerQueue(priority, maxConcurrency, this);
    ...
}

最后,在嵌套的QueuedTaskSchedulerQueue:中

// This is added.
private readonly int _maxConcurrency;
private readonly Semaphore _semaphore;
internal bool TryLock()
{
    return _semaphore.WaitOne(0);
}
internal void Release()
{
    _semaphore.Release();
    _pool.NotifyNewWorkItem();
}
/// <summary>Initializes the queue.</summary>
/// <param name="priority">The priority associated with this queue.</param>
/// <param name="maxConcurrency">Max concurrency for this scheduler.</param>
/// <param name="pool">The scheduler with which this queue is associated.</param>
internal QueuedTaskSchedulerQueue(int priority, int maxConcurrency, QueuedTaskScheduler pool)
{
    _priority = priority;
    _pool = pool;
    _workItems = new Queue<Task>();
    // This is added.
    _maxConcurrency = maxConcurrency;
    _semaphore = new Semaphore(_maxConcurrency, _maxConcurrency);
}

我希望这对尝试像我一样在一个易于使用的调度程序(可以使用默认线程池或任何其他调度程序)上执行无序任务和有序任务的人来说可能有用。

===更新===

受Stephen Cleary的启发,我最终使用了:

private static readonly Lazy<TaskScheduler> Scheduler = new Lazy<TaskScheduler>(
    () => new WorkStealingTaskScheduler(16));
public static TaskScheduler Default
{
    get
    {
        return Scheduler.Value;
    }
}
public static TaskScheduler CreateNewOrderedTaskScheduler()
{
    return new QueuedTaskScheduler(Default, 1);
}

我知道你的任务有依赖关系,这就是为什么你想(部分)订购它们。你可以用ContinueWith链来做这件事。您只需要跟踪任何给定链中的最新任务。当一个新任务进入时,您设置该任务的下一个延续并存储新任务。你放弃旧的。

替代方案:每条链有一个SemaphoreSlim,并使用await sem.WaitAsync()非常灵活地手动控制DOP。注意,异步等待信号量不会阻塞任何线程。它只会导致少量内存使用。根本没有使用任何操作系统资源。你可以使用大量的信号量。

我不认为调度器是正确的抽象概念。调度器用于基于CPU的工作。其他协调工具可以使用任何Task,包括异步IO。考虑更喜欢普通的任务组合子和协调原语。