使用 TPL 对单独的调用进行批处理/取消并行化
本文关键字:批处理 取消 并行化 调用 TPL 单独 使用 | 更新日期: 2023-09-27 18:21:44
也许TPL不是正确的工具,但至少从一个不是特别熟悉它的人那里,它似乎应该具有我正在寻找的东西。 不过,我对不使用它的答案持开放态度。
给定这样的方法:
public Task Submit(IEnumerable<WorkItem> work)
这可能会对项目集合执行昂贵的异步操作。 通常,调用方会批处理这些项目并一次提交尽可能多的项目,并且这些批处理之间存在相当长的延迟,因此它的执行效率相当高。
但是,在某些情况下,不会发生外部批处理Submit
,并且会快速连续多次调用少量项目(通常只有一个(,甚至可能从单独的线程同时调用
我想做的是推迟处理(同时累积参数(,直到有一定的时间没有调用,然后按照最初指定的顺序执行整个批处理的操作。
或者换句话说,每次调用该方法时,它都应该将其参数添加到待处理项列表中,然后从零开始重新启动延迟,以便在处理任何内容之前需要一定的空闲时间。
我不想要批处理的大小限制(所以我认为 BatchBlock 不是正确的答案(,我只想要延迟/超时。 我确信调用模式是这样的,在某个时候会有一个空闲期。
我不确定是否最好推迟第一次调用,或者它是否应该立即启动操作,并且仅在操作仍在进行时才推迟后续调用。
如果它使问题更容易,我可以使Submit
返回无效而不是Task
(即无法观察何时完成(。
我敢肯定我可以把这样的东西混在一起,但它似乎是应该已经存在于某个地方的东西。 谁能指出我正确的方向? (不过,我宁愿不使用非核心库。
好的,所以由于找不到合适的东西,我最终自己实现了一些东西。 似乎可以解决问题。 (我实现它的方式比实际代码中显示的更通用,因此我可以更轻松地重用它,但这说明了这个概念。
private readonly ConcurrentQueue<WorkItem> _Items
= new ConcurrentQueue<WorkItem>();
private CancellationTokenSource _CancelSource;
public async Task Submit(IEnumerable<WorkItem> items)
{
var cancel = ReplacePreviousTasks();
foreach (var item in items)
{
_Items.Enqueue(item);
}
await Task.Delay(TimeSpan.FromMilliseconds(250), cancel.Token);
if (!cancel.IsCancellationRequested)
{
await RunOperation();
}
}
private CancellationTokenSource ReplacePreviousTasks()
{
var cancel = new CancellationTokenSource();
var old = Interlocked.Exchange(ref _CancelSource, cancel);
if (old != null)
{
old.Cancel();
}
return cancel;
}
private async Task RunOperation()
{
var items = new List<WorkItem>();
WorkItem item;
while (_Items.TryDequeue(out item))
{
items.Add(item);
}
// do the operation on items
}
如果在 250 毫秒内发生多个提交,则取消较早的提交,并在 250 毫秒启动后对所有项目执行一次操作(从最新提交开始计数(。
如果在操作运行时发生另一个提交,它将继续运行而不会取消(它很有可能从后面的调用中窃取一些项目,但这没关系(。
(从技术上讲,检查cancel.IsCancellationRequested
并不是真正必要的,因为如果在延迟期间取消了上述await
,则会引发异常。 但它并不疼,而且有一个小窗口可能会抓住。