异步任务.WhenAll超时

本文关键字:超时 WhenAll 任务 异步 | 更新日期: 2023-09-27 18:28:55

在新的异步dotnet 4.5库中是否有方法设置Task.WhenAll方法的超时?我想获取几个来源,并在5秒钟后停止,跳过未完成的来源。

异步任务.WhenAll超时

您可以使用Task.WhenAny():将生成的TaskTask.Delay()组合

await Task.WhenAny(Task.WhenAll(tasks), Task.Delay(timeout));

如果您想在超时的情况下收获已完成的任务:

var completedResults =
  tasks
  .Where(t => t.Status == TaskStatus.RanToCompletion)
  .Select(t => t.Result)
  .ToList();

我认为一个更清晰、更健壮的选项,也可以进行异常处理,那就是在每个任务上使用Task.WhenAny和一个超时任务,遍历所有完成的任务并过滤掉超时的任务,并使用await Task.WhenAll()而不是Task.Result来收集所有结果。

这里有一个完整的工作解决方案:

static async Task<TResult[]> WhenAll<TResult>(IEnumerable<Task<TResult>> tasks, TimeSpan timeout)
{
    var timeoutTask = Task.Delay(timeout).ContinueWith(_ => default(TResult));
    var completedTasks = 
        (await Task.WhenAll(tasks.Select(task => Task.WhenAny(task, timeoutTask)))).
        Where(task => task != timeoutTask);
    return await Task.WhenAll(completedTasks);
}

查看;早期救助";以及";任务延迟";Microsoft的"使用基于任务的异步模式"中的部分。

早期救助。由t1表示的操作可以分组为WhenAny有另一个任务t2,我们可以等待WhenAny任务。t2可能表示超时、取消或其他信号将导致WhenAny任务在t1完成之前完成。

您所描述的似乎是一种非常常见的需求,但我在任何地方都找不到这样的例子。我搜索了很多。。。我最终创建了以下内容:

TimeSpan timeout = TimeSpan.FromSeconds(5.0);
Task<Task>[] tasksOfTasks =
{
    Task.WhenAny(SomeTaskAsync("a"), Task.Delay(timeout)),
    Task.WhenAny(SomeTaskAsync("b"), Task.Delay(timeout)),
    Task.WhenAny(SomeTaskAsync("c"), Task.Delay(timeout))
};
Task[] completedTasks = await Task.WhenAll(tasksOfTasks);
List<MyResult> = completedTasks.OfType<Task<MyResult>>().Select(task => task.Result).ToList();

我假设这里有一个方法SomeTaskAsync,它返回Task<MyResult>。

在completedTasks的成员中,只有MyResult类型的任务是我们自己的任务,它们能够争分夺秒。Task。Delay返回不同的类型。这需要在打字方面做出一些妥协,但仍然可以很好地工作,而且非常简单。

(当然,可以使用查询+ToArray动态构建数组)。

  • 请注意,此实现不需要SomeTaskAsync来接收取消令牌

除了超时,我还检查取消,如果您正在构建web应用程序,这很有用。

public static async Task WhenAll(
    IEnumerable<Task> tasks, 
    int millisecondsTimeOut,
    CancellationToken cancellationToken)
{
    using(Task timeoutTask = Task.Delay(millisecondsTimeOut))
    using(Task cancellationMonitorTask = Task.Delay(-1, cancellationToken))
    {
        Task completedTask = await Task.WhenAny(
            Task.WhenAll(tasks), 
            timeoutTask, 
            cancellationMonitorTask
        );
        if (completedTask == timeoutTask)
        {
            throw new TimeoutException();
        }
        if (completedTask == cancellationMonitorTask)
        {
            throw new OperationCanceledException();
        }
        await completedTask;
    }
}

查看中提出的自定义任务组合子http://tutorials.csharp-online.net/Task_Combinators

async static Task<TResult> WithTimeout<TResult> 
   (this Task<TResult> task, TimeSpan timeout)
 {
   Task winner = await (Task.WhenAny 
      (task, Task.Delay (timeout)));
   if (winner != task) throw new TimeoutException();
   return await task; // Unwrap result/re-throw
}

我还没试过。

作废@iArnon答案的结果版本,以及注释和更改第一个参数以使用扩展this。

我还有一个转发方法,使用TimeSpan.FromMilliseconds(millisecondsTimeout)将timeout指定为int,以匹配其他Task方法。

public static async Task WhenAll(this IEnumerable<Task> tasks, TimeSpan timeout)
{
  // Create a timeout task.
  var timeoutTask = Task.Delay(timeout);
  // Get the completed tasks made up of...
  var completedTasks =
  (
    // ...all tasks specified
    await Task.WhenAll(tasks
    // Now finish when its task has finished or the timeout task finishes
    .Select(task => Task.WhenAny(task, timeoutTask)))
  )
  // ...but not the timeout task
  .Where(task => task != timeoutTask);
  // And wait for the internal WhenAll to complete.
  await Task.WhenAll(completedTasks);
}

似乎只需要带有timeout参数的Task.WaitAll重载-如果它返回true,那么你就知道它们都完成了-否则,你可以在IsCompleted上进行筛选。

if (Task.WaitAll(tasks, myTimeout) == false)
{
    tasks = tasks.Where(t => t.IsCompleted);
}
...

我得到了以下一段代码,它可以满足我的需要:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using System.Net.Http;
using System.Json;
using System.Threading;
namespace MyAsync
{
    class Program
    {
        static void Main(string[] args)
        {
            var cts = new CancellationTokenSource();
            Console.WriteLine("Start Main");
            List<Task<List<MyObject>>> listoftasks = new List<Task<List<MyObject>>>();
            listoftasks.Add(GetGoogle(cts));
            listoftasks.Add(GetTwitter(cts));
            listoftasks.Add(GetSleep(cts));
            listoftasks.Add(GetxSleep(cts));
            List<MyObject>[] arrayofanswers = Task.WhenAll(listoftasks).Result;
            List<MyObject> answer = new List<MyObject>();
            foreach (List<MyObject> answers in arrayofanswers)
            {
                answer.AddRange(answers);
            }
            foreach (MyObject o in answer)
            {
                Console.WriteLine("{0} - {1}", o.name, o.origin);
            }
            Console.WriteLine("Press <Enter>");
            Console.ReadLine();
        } 
        static async Task<List<MyObject>> GetGoogle(CancellationTokenSource cts) 
        {
            try
            {
                Console.WriteLine("Start GetGoogle");
                List<MyObject> l = new List<MyObject>();
                var client = new HttpClient();
                Task<HttpResponseMessage> awaitable = client.GetAsync("http://ajax.googleapis.com/ajax/services/search/web?v=1.0&q=broersa", cts.Token);
                HttpResponseMessage res = await awaitable;
                Console.WriteLine("After GetGoogle GetAsync");
                dynamic data = JsonValue.Parse(res.Content.ReadAsStringAsync().Result);
                Console.WriteLine("After GetGoogle ReadAsStringAsync");
                foreach (var r in data.responseData.results)
                {
                    l.Add(new MyObject() { name = r.titleNoFormatting, origin = "google" });
                }
                return l;
            }
            catch (TaskCanceledException)
            {
                return new List<MyObject>();
            }
        }
        static async Task<List<MyObject>> GetTwitter(CancellationTokenSource cts)
        {
            try
            {
                Console.WriteLine("Start GetTwitter");
                List<MyObject> l = new List<MyObject>();
                var client = new HttpClient();
                Task<HttpResponseMessage> awaitable = client.GetAsync("http://search.twitter.com/search.json?q=broersa&rpp=5&include_entities=true&result_type=mixed",cts.Token);
                HttpResponseMessage res = await awaitable;
                Console.WriteLine("After GetTwitter GetAsync");
                dynamic data = JsonValue.Parse(res.Content.ReadAsStringAsync().Result);
                Console.WriteLine("After GetTwitter ReadAsStringAsync");
                foreach (var r in data.results)
                {
                    l.Add(new MyObject() { name = r.text, origin = "twitter" });
                }
                return l;
            }
            catch (TaskCanceledException)
            {
                return new List<MyObject>();
            }
        }
        static async Task<List<MyObject>> GetSleep(CancellationTokenSource cts)
        {
            try
            {
                Console.WriteLine("Start GetSleep");
                List<MyObject> l = new List<MyObject>();
                await Task.Delay(5000,cts.Token);
                l.Add(new MyObject() { name = "Slept well", origin = "sleep" });
                return l;
            }
            catch (TaskCanceledException)
            {
                return new List<MyObject>();
            }
        } 
        static async Task<List<MyObject>> GetxSleep(CancellationTokenSource cts)
        {
            Console.WriteLine("Start GetxSleep");
            List<MyObject> l = new List<MyObject>();
            await Task.Delay(2000);
            cts.Cancel();
            l.Add(new MyObject() { name = "Slept short", origin = "xsleep" });
            return l;
        } 
    }
}

我的解释在我的博客文章中:http://blog.bekijkhet.com/2012/03/c-async-examples-whenall-whenany.html

除了svick的答案外,当我必须等待几项任务完成,但在等待时必须处理其他事情时,以下对我有效:

Task[] TasksToWaitFor = //Your tasks
TimeSpan Timeout = TimeSpan.FromSeconds( 30 );
while( true )
{
    await Task.WhenAny( Task.WhenAll( TasksToWaitFor ), Task.Delay( Timeout ) );
    if( TasksToWaitFor.All( a => a.IsCompleted ) )
        break;
    //Do something else here
}

您可以使用以下代码:

        var timeoutTime = 10;
        var tasksResult = await Task.WhenAll(
                                listOfTasks.Select(x => Task.WhenAny(
                                    x, Task.Delay(TimeSpan.FromMinutes(timeoutTime)))
                                )
                            );

        var succeededtasksResponses = tasksResult
                                               .OfType<Task<MyResult>>()
                                               .Select(task => task.Result);
        if (succeededtasksResponses.Count() != listOfTasks.Count())
        {
            // Not all tasks were completed
            // Throw error or do whatever you want
        }
        //You can use the succeededtasksResponses that contains the list of successful responses

工作原理:

您需要在timeoutTime变量中输入完成所有任务的时间限制。因此,基本上所有任务都将在您在timeoutTime中设置的最长等待时间内等待。当所有任务返回结果时,将不会发生超时,并设置tasksResult。

在那之后,我们只得到完成的任务。未完成的任务将没有结果。

我试图改进优秀的iArnon解决方案,以解决一些小问题,但最终得到了完全不同的实现。我试图解决的两个问题是:

  1. 如果多个任务失败,则传播所有失败任务的错误,而不仅仅是列表中第一个失败任务的错
  2. 防止内存泄漏,以防所有任务的完成速度远远快于timeout。如果在循环中调用WhenAll,并且timeout很大,则泄漏活动的Task.Delay可能会导致不可忽略的内存泄漏量

除此之外,我还添加了一个cancellationToken参数、解释该方法的XML文档以及参数验证。这是:

/// <summary>
/// Returns a task that will complete when all of the tasks have completed,
/// or when the timeout has elapsed, or when the token is canceled, whatever
/// comes first. In case the tasks complete first, the task contains the
/// results/exceptions of all the tasks. In case the timeout elapsed first,
/// the task contains the results/exceptions of the completed tasks only.
/// In case the token is canceled first, the task is canceled. To determine
/// whether a timeout has occured, compare the number of the results with
/// the number of the tasks.
/// </summary>
public static Task<TResult[]> WhenAll<TResult>(
    Task<TResult>[] tasks,
    TimeSpan timeout, CancellationToken cancellationToken = default)
{
    if (tasks == null) throw new ArgumentNullException(nameof(tasks));
    tasks = tasks.ToArray(); // Defensive copy
    if (tasks.Any(t => t == null)) throw new ArgumentException(
        $"The {nameof(tasks)} argument included a null value.", nameof(tasks));
    if (timeout < TimeSpan.Zero && timeout != Timeout.InfiniteTimeSpan)
        throw new ArgumentOutOfRangeException(nameof(timeout));
    if (cancellationToken.IsCancellationRequested)
        return Task.FromCanceled<TResult[]>(cancellationToken);
    var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
    cts.CancelAfter(timeout);
    var continuationOptions = TaskContinuationOptions.DenyChildAttach |
        TaskContinuationOptions.ExecuteSynchronously;
    var continuations = tasks.Select(task => task.ContinueWith(_ => { },
        cts.Token, continuationOptions, TaskScheduler.Default));
    return Task.WhenAll(continuations).ContinueWith(allContinuations =>
    {
        cts.Dispose();
        if (allContinuations.IsCompletedSuccessfully)
            return Task.WhenAll(tasks); // No timeout or cancellation occurred
        Debug.Assert(allContinuations.IsCanceled);
        if (cancellationToken.IsCancellationRequested)
            return Task.FromCanceled<TResult[]>(cancellationToken);
        // Now we know that timeout has occurred
        return Task.WhenAll(tasks.Where(task => task.IsCompleted));
    }, default, continuationOptions, TaskScheduler.Default).Unwrap();
}

WhenAll实现消除了异步和等待,这在一般情况下是不可取的。在这种情况下,为了在未嵌套的AggregateException中传播所有错误,这是必要的。其目的是尽可能准确地模拟内置Task.WhenAll方法的行为。

用法示例:

string[] results;
Task<string[]> whenAllTask = WhenAll(tasks, TimeSpan.FromSeconds(15));
try
{
    results = await whenAllTask;
}
catch when (whenAllTask.IsFaulted) // It might also be canceled
{
    // Log all errors
    foreach (var innerEx in whenAllTask.Exception.InnerExceptions)
    {
        _logger.LogError(innerEx, innerEx.Message);
    }
    throw; // Propagate the error of the first failed task
}
if (results.Length < tasks.Length) throw new TimeoutException();
return results;

注意:上述API存在设计缺陷。如果至少有一项任务失败或被取消,则无法确定是否发生了超时。WhenAll返回的任务的Exception.InnerExceptions属性可能包含所有任务或部分任务的异常,无法判断哪个是哪个。不幸的是,我想不出解决这个问题的办法。