平行的.ForEach使用线程.睡眠等价的

本文关键字:线程 ForEach | 更新日期: 2023-09-27 18:10:33

情况是这样的:我需要给一个开始搜索的网站打电话。这个搜索持续了一个未知的时间,我知道搜索是否已经完成的唯一方法是定期查询网站,看看是否有一个"下载数据"链接在它的某个地方(它使用一些奇怪的ajax调用javascript定时器来检查后端和更新页面,我认为)。

技巧是这样的:我有数百个项目需要搜索,一次一个。所以我有一些代码看起来像这样:

var items = getItems();
Parallel.ForEach(items, item =>
{
   startSearch(item);
   var finished = isSearchFinished(item);
   while(finished == false)
   {
      finished = isSearchFinished(item); //<--- How do I delay this action 30 Secs?
   }
   downloadData(item);
}

现在,很明显这不是真正的代码,因为可能有一些东西导致isSearchFinished总是false

除了明显的无限循环危险之外,我如何正确地防止isSearchFinished()反复调用,而是每30秒或1分钟调用一次?

我知道Thread.Sleep()不是正确的解决方案,我认为解决方案可以通过使用Threading.Timer()来完成,但我不太熟悉它,并且有这么多线程选项,我只是不确定该使用哪个。

平行的.ForEach使用线程.睡眠等价的

使用任务和async/await很容易实现,正如@KevinS在评论中所指出的:

async Task<ItemData> ProcessItemAsync(Item item)
{
    while (true)
    {
        if (await isSearchFinishedAsync(item))
            break;
        await Task.Delay(30 * 1000);
    }
    return await downloadDataAsync(item);
}
// ...
var items = getItems();
var tasks = items.Select(i => ProcessItemAsync(i)).ToArray();
await Task.WhenAll(tasks);
var data = tasks.Select(t = > t.Result);

这样,您就不会因为大多数I/o绑定的网络操作而徒劳地阻塞ThreadPool线程。如果您不熟悉async/await, async-await标记wiki可能是一个很好的起点。

我假设你可以将你的同步方法isSearchFinisheddownloadData转换为异步版本,使用像HttpClient这样的非阻塞HTTP请求并返回Task<>。如果您不能这样做,您仍然可以简单地用Task.Run包装它们,作为await Task.Run(() => isSearchFinished(item))await Task.Run(() => downloadData(item))。通常不建议这样做,但是当您有数百个项目时,在这种情况下,它仍然会为您提供比Parallel.ForEach更好的并发级别,因为您不会阻塞池线程30秒,这要归功于异步Task.Delay

您还可以使用TaskCompletionSourceThreading.Timer编写泛型函数,以返回指定重试函数成功后变为完整的Task

public static Task RetryAsync(Func<bool> retryFunc, TimeSpan retryInterval)
{
    return RetryAsync(retryFunc, retryInterval, CancellationToken.None);
}
public static Task RetryAsync(Func<bool> retryFunc, TimeSpan retryInterval, CancellationToken cancellationToken)
{
    var tcs = new TaskCompletionSource<object>();
    cancellationToken.Register(() => tcs.TrySetCanceled());
    var timer = new Timer((state) =>
    {
        var taskCompletionSource = (TaskCompletionSource<object>) state;
        try
        {                   
            if (retryFunc())
            {
                taskCompletionSource.TrySetResult(null);
            }
        }
        catch (Exception ex)
        {
            taskCompletionSource.TrySetException(ex);
        }
    }, tcs, TimeSpan.FromMilliseconds(0), retryInterval);
    // Once the task is complete, dispose of the timer so it doesn't keep firing. Also captures the timer
    // in a closure so it does not get disposed.
    tcs.Task.ContinueWith(t => timer.Dispose(),
                          CancellationToken.None,
                          TaskContinuationOptions.ExecuteSynchronously,
                          TaskScheduler.Default);
    return tcs.Task;
}

可以这样使用RetryAsync:

var searchTasks = new List<Task>();
searchTasks.AddRange(items.Select(
        downloadItem => RetryAsync( () => isSearchFinished(downloadItem),  TimeSpan.FromSeconds(2))  // retry timout
        .ContinueWith(t => downloadData(downloadItem), 
                      CancellationToken.None, 
                      TaskContinuationOptions.OnlyOnRanToCompletion, 
                      TaskScheduler.Default)));
await Task.WhenAll(searchTasks.ToArray());

ContinueWith部分指定任务成功完成后要做什么。在这种情况下,它将在线程池线程上运行downloadData方法,因为我们指定了TaskScheduler.Default,并且只有在任务运行到完成时才会执行延续,即它没有被取消,也没有抛出异常。