平行的.ForEach使用线程.睡眠等价的
本文关键字:线程 ForEach | 更新日期: 2023-09-27 18:10:33
情况是这样的:我需要给一个开始搜索的网站打电话。这个搜索持续了一个未知的时间,我知道搜索是否已经完成的唯一方法是定期查询网站,看看是否有一个"下载数据"链接在它的某个地方(它使用一些奇怪的ajax调用javascript定时器来检查后端和更新页面,我认为)。
技巧是这样的:我有数百个项目需要搜索,一次一个。所以我有一些代码看起来像这样:
var items = getItems();
Parallel.ForEach(items, item =>
{
startSearch(item);
var finished = isSearchFinished(item);
while(finished == false)
{
finished = isSearchFinished(item); //<--- How do I delay this action 30 Secs?
}
downloadData(item);
}
现在,很明显这不是真正的代码,因为可能有一些东西导致isSearchFinished
总是false
。
除了明显的无限循环危险之外,我如何正确地防止isSearchFinished()
反复调用,而是每30秒或1分钟调用一次?
我知道Thread.Sleep()
不是正确的解决方案,我认为解决方案可以通过使用Threading.Timer()
来完成,但我不太熟悉它,并且有这么多线程选项,我只是不确定该使用哪个。
使用任务和async/await
很容易实现,正如@KevinS在评论中所指出的:
async Task<ItemData> ProcessItemAsync(Item item)
{
while (true)
{
if (await isSearchFinishedAsync(item))
break;
await Task.Delay(30 * 1000);
}
return await downloadDataAsync(item);
}
// ...
var items = getItems();
var tasks = items.Select(i => ProcessItemAsync(i)).ToArray();
await Task.WhenAll(tasks);
var data = tasks.Select(t = > t.Result);
这样,您就不会因为大多数I/o绑定的网络操作而徒劳地阻塞ThreadPool
线程。如果您不熟悉async/await
, async-await
标记wiki可能是一个很好的起点。
我假设你可以将你的同步方法isSearchFinished
和downloadData
转换为异步版本,使用像HttpClient
这样的非阻塞HTTP请求并返回Task<>
。如果您不能这样做,您仍然可以简单地用Task.Run
包装它们,作为await Task.Run(() => isSearchFinished(item))
和await Task.Run(() => downloadData(item))
。通常不建议这样做,但是当您有数百个项目时,在这种情况下,它仍然会为您提供比Parallel.ForEach
更好的并发级别,因为您不会阻塞池线程30秒,这要归功于异步Task.Delay
。
您还可以使用TaskCompletionSource
和Threading.Timer
编写泛型函数,以返回指定重试函数成功后变为完整的Task
。
public static Task RetryAsync(Func<bool> retryFunc, TimeSpan retryInterval)
{
return RetryAsync(retryFunc, retryInterval, CancellationToken.None);
}
public static Task RetryAsync(Func<bool> retryFunc, TimeSpan retryInterval, CancellationToken cancellationToken)
{
var tcs = new TaskCompletionSource<object>();
cancellationToken.Register(() => tcs.TrySetCanceled());
var timer = new Timer((state) =>
{
var taskCompletionSource = (TaskCompletionSource<object>) state;
try
{
if (retryFunc())
{
taskCompletionSource.TrySetResult(null);
}
}
catch (Exception ex)
{
taskCompletionSource.TrySetException(ex);
}
}, tcs, TimeSpan.FromMilliseconds(0), retryInterval);
// Once the task is complete, dispose of the timer so it doesn't keep firing. Also captures the timer
// in a closure so it does not get disposed.
tcs.Task.ContinueWith(t => timer.Dispose(),
CancellationToken.None,
TaskContinuationOptions.ExecuteSynchronously,
TaskScheduler.Default);
return tcs.Task;
}
可以这样使用RetryAsync
:
var searchTasks = new List<Task>();
searchTasks.AddRange(items.Select(
downloadItem => RetryAsync( () => isSearchFinished(downloadItem), TimeSpan.FromSeconds(2)) // retry timout
.ContinueWith(t => downloadData(downloadItem),
CancellationToken.None,
TaskContinuationOptions.OnlyOnRanToCompletion,
TaskScheduler.Default)));
await Task.WhenAll(searchTasks.ToArray());
ContinueWith
部分指定任务成功完成后要做什么。在这种情况下,它将在线程池线程上运行downloadData
方法,因为我们指定了TaskScheduler.Default
,并且只有在任务运行到完成时才会执行延续,即它没有被取消,也没有抛出异常。