已完成 Task.Factory.StartNew((() => Parallel.ForEach的事件处理程序
本文关键字:ForEach Parallel 程序 事件处理 Task Factory StartNew 已完成 | 更新日期: 2023-09-27 18:35:27
我想知道一些并行任务何时完成。
我使用此代码在网站上制作了 1500 到 2000 个具有 10 秒 HttpRequest 超时的小型 WebClient.DownloadString:
Task.Factory.StartNew(() =>
Parallel.ForEach<string>(myKeywords, new ParallelOptions
{ MaxDegreeOfParallelism = 5 }, getKey));
有时,查询失败,因此存在异常并且函数永远不会完成,并且每个getKey函数内的UI刷新有时似乎被调用两次,因此我无法准确了解完成了多少任务。我正在计算:UI 刷新调用数/关键字总数,并得到 100% 到 250% 之间的结果,我永远不知道任务何时完成。我在很多SO讨论中搜索,但没有一个是直接方法或适合我需求的方法。所以我想框架 4.0 没有提供任何任务.AllDone 事件处理程序或类似的解决方法?
我应该在另一个线程中运行我的 Parallel.Foreach 而不是我的 UI 线程,然后添加它吗?
myTasks.WaitAll
[编辑]
一个临时解决方案是将我的字符串列表复制到 ArrayList 中,然后在每个查询开始时从列表中逐个删除每个项目。无论函数是否运行良好,我都知道何时处理了所有项目。
在处理异常时,Parallel.ForEach
与其他循环没有什么不同。 如果抛出异常,那么它将停止循环的处理。 这可能就是为什么您看到百分比差异的原因(我假设您可能在处理循环时正在处理计数)。
此外,您实际上并不需要Parallel.ForEach
因为您在 WebClient
类上进行的异步调用将阻止等待 IO 完成(网络响应),它们不受计算限制(Parallel.ForEach
当你被计算绑定时要好得多)。
也就是说,您应该首先将调用转换为WebClient
以使用Task<TResult>
。 使用 TaskCompletionSource<TResult>
类,将基于事件的异步模式转换为基于任务的异步模式非常简单。
假设您有一系列Uri
实例,这些实例是由于调用 getKey
而生成的,您可以创建一个函数来执行此操作:
static Task<String> DownloadStringAsync(Uri uri)
{
// Create a WebClient
var wc = new WebClient();
// Set up your web client.
// Create the TaskCompletionSource.
var tcs = new TaskCompletionSource<string>();
// Set the event handler on the web client.
wc.DownloadStringCompleted += (s, e) => {
// Dispose of the WebClient when done.
using (wc)
{
// Set the task completion source based on the
// event.
if (e.Cancelled)
{
// Set cancellation.
tcs.SetCancelled();
return;
}
// Exception?
if (e.Error != null)
{
// Set exception.
tcs.SetException(e.Error);
return;
}
// Set result.
tcs.SetResult(e.Result);
};
// Return the task.
return tcs.Task;
};
请注意,可以优化上述内容以使用一个WebClient
,这是留给您的练习(假设您的测试表明您需要它)。
从那里,您可以获得一系列Task<string>
:
// Gotten from myKeywords
IEnumerable<Uri> uris = ...;
// The tasks.
Task<string>[] tasks = uris.Select(DownloadStringAsync).ToArray();
请注意,必须调用 ToArray
扩展方法才能使任务开始运行。 这是为了绕过延迟执行。 您不必调用 ToArray
,但您必须调用将枚举整个列表并导致任务开始运行的东西。
有了这些Task<string>
实例后,可以通过在TaskFactory
类上调用 ContinueWhenAll<TAntecedentResult>
方法来等待它们全部完成,如下所示:
Task.Factory.ContinueWhenAll(tasks, a => { }).Wait();
完成此操作后,您可以循环浏览tasks
数组并查看Exception
和/或Result
属性以查看异常或结果是什么。
如果要更新用户界面,则应查看截获对 Enumerable.Select 的调用,即,应在下载完成后调用Task<TResult>
上的 ContinueWith<TNewResult>
方法以执行操作,如下所示:
// The tasks.
Task<string>[] tasks = uris.
Select(DownloadStringAsync).
// Select receives a Task<T> here, continue that.
Select(t => t.ContinueWith(t2 => {
// Do something here:
// - increment a count
// - fire an event
// - update the UI
// Note that you have to take care of synchronization here, so
// make sure to synchronize access to a count, or serialize calls
// to the UI thread appropriately with a SynchronizationContext.
...
// Return the result, this ensures that you'll have a Task<string>
// waiting.
return t2;
})).
ToArray();
这将允许您在事情发生时更新它们。 请注意,在上述情况下,如果再次调用Select
,则可能需要检查t2
的状态并触发一些其他事件,具体取决于您希望错误处理机制是什么。