使用 TPL 对单独的调用进行批处理/取消并行化

本文关键字:批处理 取消 并行化 调用 TPL 单独 使用 | 更新日期: 2023-09-27 18:21:44

也许TPL不是正确的工具,但至少从一个不是特别熟悉它的人那里,它似乎应该具有我正在寻找的东西。 不过,我对不使用它的答案持开放态度。

给定这样的方法:

public Task Submit(IEnumerable<WorkItem> work)

这可能会对项目集合执行昂贵的异步操作。 通常,调用方会批处理这些项目并一次提交尽可能多的项目,并且这些批处理之间存在相当长的延迟,因此它的执行效率相当高。

但是,在某些情况下,不会发生外部批处理Submit,并且会快速连续多次调用少量项目(通常只有一个(,甚至可能从单独的线程同时调用

我想做的是推迟处理(同时累积参数(,直到有一定的时间没有调用,然后按照最初指定的顺序执行整个批处理的操作。

或者换句话说,每次调用该方法时,它都应该将其参数添加到待处理项列表中,然后从零开始重新启动延迟,以便在处理任何内容之前需要一定的空闲时间。

我不想要批处理的大小限制(所以我认为 BatchBlock 不是正确的答案(,我只想要延迟/超时。 我确信调用模式是这样的,在某个时候有一个空闲期。

我不确定是否最好推迟第一次调用,或者它是否应该立即启动操作,并且仅在操作仍在进行时才推迟后续调用。

如果它使问题更容易,我可以使Submit返回无效而不是Task(即无法观察何时完成(。

我敢肯定我可以把这样的东西混在一起,但它似乎是应该已经存在于某个地方的东西。 谁能指出我正确的方向? (不过,我宁愿不使用非核心库。

使用 TPL 对单独的调用进行批处理/取消并行化

好的,所以由于找不到合适的东西,我最终自己实现了一些东西。 似乎可以解决问题。 (我实现它的方式比实际代码中显示的更通用,因此我可以更轻松地重用它,但这说明了这个概念。

private readonly ConcurrentQueue<WorkItem> _Items
    = new ConcurrentQueue<WorkItem>();
private CancellationTokenSource _CancelSource;
public async Task Submit(IEnumerable<WorkItem> items)
{
    var cancel = ReplacePreviousTasks();
    foreach (var item in items)
    {
        _Items.Enqueue(item);
    }
    await Task.Delay(TimeSpan.FromMilliseconds(250), cancel.Token);
    if (!cancel.IsCancellationRequested)
    {
        await RunOperation();
    }
}
private CancellationTokenSource ReplacePreviousTasks()
{
    var cancel = new CancellationTokenSource();
    var old = Interlocked.Exchange(ref _CancelSource, cancel);
    if (old != null)
    {
        old.Cancel();
    }
    return cancel;
}
private async Task RunOperation()
{
    var items = new List<WorkItem>();
    WorkItem item;
    while (_Items.TryDequeue(out item))
    {
        items.Add(item);
    }
    // do the operation on items
}
如果在 250 毫秒

内发生多个提交,则取消较早的提交,并在 250 毫秒启动后对所有项目执行一次操作(从最新提交开始计数(。

如果在操作运行时发生另一个提交,它将继续运行而不会取消(它很有可能从后面的调用中窃取一些项目,但这没关系(。

(从技术上讲,检查cancel.IsCancellationRequested并不是真正必要的,因为如果在延迟期间取消了上述await,则会引发异常。 但它并不疼,而且有一个小窗口可能会抓住。