如何使用异步倒计时事件,而不是收集任务并等待它们

本文关键字:任务 等待 异步 何使用 倒计时 事件 | 更新日期: 2023-09-27 18:33:53

我有以下代码:

var tasks = await taskSeedSource
    .Select(taskSeed => GetPendingOrRunningTask(taskSeed, createTask, onFailed, onSuccess, sem))
    .ToList()
    .ToTask();
if (tasks.Count == 0)
{
    return;
}
if (tasks.Contains(null))
{
    tasks = tasks.Where(t => t != null).ToArray();
    if (tasks.Count == 0)
    {
        return;
    }
}
await Task.WhenAll(tasks);

其中taskSeedSource是反应式可观察量。可能是这段代码有很多问题,但我至少看到两个:

  1. 正在收集任务,而没有它我也可以。
  2. 不知何故,返回的任务列表可能包含空值,即使GetPendingOrRunningTask是一个async方法,因此永远不会返回null。我不明白为什么会发生这种情况,所以我不得不在不了解问题原因的情况下防御它。

我想使用 AsyncEx 框架中的AsyncCountdownEvent,而不是收集任务然后等待它们。

因此,我可以将倒计时事件传递给GetPendingOrRunningTask 这将立即递增它,并在等待其内部逻辑完成后返回之前发出信号。但是,我不明白如何将倒计时事件集成到 monad 中(这是反应式行话,不是吗?

正确的方法是什么?

编辑

伙计们,让我们忘记返回列表中神秘的空值。假设一切都是绿色的,代码是

var tasks = await taskSeedSource
    .Select(taskSeed => GetPendingOrRunningTask(taskSeed, ...))
    .ToList()
    .ToTask();
await Task.WhenAll(tasks);

现在的问题是我该如何通过倒计时事件做到这一点?所以,假设我有:

var c = new AsyncCountdownEvent(1);

async Task GetPendingOrRunningTask<T>(AsyncCountdownEvent c, T taskSeed, ...)
{
  c.AddCount();
  try
  {
    await ....
  }
  catch (Exception exc)
  {
    // The exception is handled
  }
  c.Signal();  
}

我的问题是我不再需要返回的任务。这些任务收集并等待获得所有工作项结束的时刻,但现在倒计时事件可用于指示工作何时结束。

我的问题是我不确定如何将其集成到反应式链中。从本质上讲,GetPendingOrRunningTask可以async void 。在这里,我被困住了。

编辑 2

任务列表中空条目出现奇怪外观

如何使用异步倒计时事件,而不是收集任务并等待它们

@Servy是正确的,您需要从源头解决空Task问题。没有人愿意回答关于如何解决一个问题,这个问题违反了你自己定义的方法的契约,但还没有提供检查的来源。

至于关于收集任务的问题,如果你的方法返回泛型Task<T>,很容易通过Merge来避免:

await taskSeedSource
  .Select(taskSeed => GetPendingOrRunningTask(taskSeed, createTask, onFailed, onSuccess, sem))
  .Where(task => task != null)  // According to you, this shouldn't be necessary.
  .Merge();

但是,不幸的是,非泛型Task没有官方的Merge重载,但这很容易定义:

public static IObservable<Unit> Merge(this IObservable<Task> sources)
{
  return sources.Select(async source =>
  {
    await source.ConfigureAwait(false);
    return Unit.Default;
  })
  .Merge();
}