使用 Parallel.For 时出现不可预知的结果

本文关键字:可预知 结果 Parallel For 使用 | 更新日期: 2023-09-27 18:35:16

我正在运行一个并行 for 循环,该循环最初运行时间为 times = 处理器数量并执行长时间运行的操作。每个任务完成后,都会检查更多任务,如果找到,则再次调用自身。

以下是我的代码的样子:

static void Main(string[] args)
{
   Int32 numberOfProcessors = Environment.ProcessorCount;
   Parallel.For(0, numberOfProcessors, index => DoSomething(index, sqsQueueURL));
}
private async static Task DoSomething(int index, string queueURL)
{
   var receiveMessageRequest = new ReceiveMessageRequest { QueueUrl = queueURL, WaitTimeSeconds = 20, MaxNumberOfMessages = 1, VisibilityTimeout = 1200 };
   AmazonSQSClient sqsClient = new AmazonSQSClient(new AmazonSQSConfig { MaxErrorRetry = 4 });
   var receiveMessageResponse = sqsClient.ReceiveMessage(receiveMessageRequest);
   foreach (var msg in receiveMessageResponse.Messages)
   {
      PerformALongRunningTask......
      //delete the message
      DeleteMessageRequest deleteMessageRequest = new DeleteMessageRequest(queueURL, msg.ReceiptHandle);
      AmazonSQSClient sqsDeleteClient = new AmazonSQSClient();
      sqsDeleteClient.DeleteMessage(deleteMessageRequest);
      //Do it again
      DoSometing(index,queueURL)
   }
}

我得到了非常不可预测的结果。它永远不会完成所有任务。它在完成所有内容之前退出。

我在这里做错了什么?

较短的代码:

static Int32 TimesToLoop = 143;
static void Main(string[] args)
{
    Int32 numberOfProcessors = Environment.ProcessorCount;
    Parallel.For(0, numberOfProcessors, index => DoSomething(index));
    Console.Read();
}
private async static Task DoSomething(int index)
{
    if(TimesToLoop == 0)
    {
        return;
    }
    Console.WriteLine(index);
    Interlocked.Decrement(ref TimesToLoop);
    DoSomething(index++);
    return;
}

使用 Parallel.For 时出现不可预知的结果

我目前看到各种问题:

  • Parallel.For才刚刚开始任务。它不会等待他们完成。它将等待DoSomething方法调用返回,但它们返回的是表示异步操作的任务,这些操作可能不会同步完成。
  • 正如CarbineCoder所指出的,你的递归几乎肯定是有缺陷的。目前尚不清楚您要实现的目标,但您需要重新考虑这一方面。
  • 无论如何,您的递归都不会await返回的任务 - 几乎可以肯定。它可能希望创建在foreach循环中创建的所有任务的集合,并一次性等待所有任务,或者它可能希望立即await它们。我们说不出来。

修复第一部分的最简单方法可能是使用 Task.WaitAll 而不是 Parallel.For

var tasks = Enumerable.Range(0, numberOfProcessors)
                      .Select(index => DoSomething(index, sqsQueueURL))
                      .ToList();
Task.WaitAll(tasks);

Task.WhenAll不同,Task.WaitAll将阻塞,直到所有指定的任务完成。请注意,如果任何任务需要在线程调用WaitAll上继续,这样做是不安全的,正是因为它阻塞了 - 但如果这是一个控制台应用程序并且您从初始线程调用它,你会没事的,因为无论如何,延续都会在线程池上执行。

private async static Task DoSomething(int index, string queueURL)
{
   ...
   foreach (var msg in receiveMessageResponse.Messages)
   {
      ...
      //Do it again
      DoSometing(index,queueURL)
   }
}

您正在递归调用DoSomething,并且没有条件中断/返回它。这可能会导致堆栈溢出并终止程序。