已完成 Task.Factory.StartNew((() => Parallel.ForEach的事件处理程序

本文关键字:ForEach Parallel 程序 事件处理 Task Factory StartNew 已完成 | 更新日期: 2023-09-27 18:35:27

我想知道一些并行任务何时完成。

我使用此代码在网站上制作了 1500 到 2000 个具有 10 秒 HttpRequest 超时的小型 WebClient.DownloadString:

Task.Factory.StartNew(() => 
    Parallel.ForEach<string>(myKeywords, new ParallelOptions 
    { MaxDegreeOfParallelism = 5 }, getKey));

有时,查询失败,因此存在异常并且函数永远不会完成,并且每个getKey函数内的UI刷新有时似乎被调用两次,因此我无法准确了解完成了多少任务。我正在计算:UI 刷新调用数/关键字总数,并得到 100% 到 250% 之间的结果,我永远不知道任务何时完成。我在很多SO讨论中搜索,但没有一个是直接方法或适合我需求的方法。所以我想框架 4.0 没有提供任何任务.AllDone 事件处理程序或类似的解决方法?

我应该在另一个线程中运行我的 Parallel.Foreach 而不是我的 UI 线程,然后添加它吗?

myTasks.WaitAll

[编辑]

一个临时解决方案是将我的字符串列表复制到 ArrayList 中,然后在每个查询开始时从列表中逐个删除每个项目。无论函数是否运行良好,我都知道何时处理了所有项目。

已完成 Task.Factory.StartNew((() => Parallel.ForEach的事件处理程序

在处理异常时,Parallel.ForEach与其他循环没有什么不同。 如果抛出异常,那么它将停止循环的处理。 这可能就是为什么您看到百分比差异的原因(我假设您可能在处理循环时正在处理计数)。

此外,您实际上并不需要Parallel.ForEach因为您在 WebClient 类上进行的异步调用将阻止等待 IO 完成(网络响应),它们不受计算限制(Parallel.ForEach当你被计算绑定时要好得多)。

也就是说,您应该首先将调用转换为WebClient以使用Task<TResult>。 使用 TaskCompletionSource<TResult> 类,将基于事件的异步模式转换为基于任务的异步模式非常简单。

假设您有一系列Uri实例,这些实例是由于调用 getKey 而生成的,您可以创建一个函数来执行此操作:

static Task<String> DownloadStringAsync(Uri uri)
{
    // Create a WebClient
    var wc = new WebClient();
    // Set up your web client.
    // Create the TaskCompletionSource.
    var tcs = new TaskCompletionSource<string>();
    // Set the event handler on the web client.
    wc.DownloadStringCompleted += (s, e) => {
        // Dispose of the WebClient when done.
        using (wc)
        {
            // Set the task completion source based on the
            // event.
            if (e.Cancelled)
            {
                // Set cancellation.
                tcs.SetCancelled();
                return;
            }
            // Exception?
            if (e.Error != null)
            { 
                // Set exception.
                tcs.SetException(e.Error);
                return;
            }
            // Set result.
            tcs.SetResult(e.Result);
        };
    // Return the task.
    return tcs.Task;
};

请注意,可以优化上述内容以使用一个WebClient,这是留给您的练习(假设您的测试表明您需要它)。

从那里,您可以获得一系列Task<string>

// Gotten from myKeywords
IEnumerable<Uri> uris = ...;
// The tasks.
Task<string>[] tasks = uris.Select(DownloadStringAsync).ToArray();

请注意,必须调用 ToArray 扩展方法才能使任务开始运行。 这是为了绕过延迟执行。 您不必调用 ToArray ,但您必须调用将枚举整个列表并导致任务开始运行的东西。

有了这些Task<string>实例后,可以通过在TaskFactory类上调用 ContinueWhenAll<TAntecedentResult> 方法来等待它们全部完成,如下所示:

Task.Factory.ContinueWhenAll(tasks, a => { }).Wait();

完成此操作后,您可以循环浏览tasks数组并查看Exception和/或Result属性以查看异常或结果是什么。

如果要更新用户界面,则应查看截获对 Enumerable.Select 的调用,即,应在下载完成后调用Task<TResult>上的 ContinueWith<TNewResult> 方法以执行操作,如下所示:

// The tasks.
Task<string>[] tasks = uris.
    Select(DownloadStringAsync).
    // Select receives a Task<T> here, continue that.
    Select(t => t.ContinueWith(t2 => {
        // Do something here: 
        //   - increment a count
        //   - fire an event
        //   - update the UI
        // Note that you have to take care of synchronization here, so
        // make sure to synchronize access to a count, or serialize calls
        // to the UI thread appropriately with a SynchronizationContext.
        ...
        // Return the result, this ensures that you'll have a Task<string>
        // waiting.
        return t2;
    })).
    ToArray();

这将允许您在事情发生时更新它们。 请注意,在上述情况下,如果再次调用Select,则可能需要检查t2的状态并触发一些其他事件,具体取决于您希望错误处理机制是什么。