使用 Parallel.For 时出现不可预知的结果
本文关键字:可预知 结果 Parallel For 使用 | 更新日期: 2023-09-27 18:35:16
我正在运行一个并行 for 循环,该循环最初运行时间为 times = 处理器数量并执行长时间运行的操作。每个任务完成后,都会检查更多任务,如果找到,则再次调用自身。
以下是我的代码的样子:
static void Main(string[] args)
{
Int32 numberOfProcessors = Environment.ProcessorCount;
Parallel.For(0, numberOfProcessors, index => DoSomething(index, sqsQueueURL));
}
private async static Task DoSomething(int index, string queueURL)
{
var receiveMessageRequest = new ReceiveMessageRequest { QueueUrl = queueURL, WaitTimeSeconds = 20, MaxNumberOfMessages = 1, VisibilityTimeout = 1200 };
AmazonSQSClient sqsClient = new AmazonSQSClient(new AmazonSQSConfig { MaxErrorRetry = 4 });
var receiveMessageResponse = sqsClient.ReceiveMessage(receiveMessageRequest);
foreach (var msg in receiveMessageResponse.Messages)
{
PerformALongRunningTask......
//delete the message
DeleteMessageRequest deleteMessageRequest = new DeleteMessageRequest(queueURL, msg.ReceiptHandle);
AmazonSQSClient sqsDeleteClient = new AmazonSQSClient();
sqsDeleteClient.DeleteMessage(deleteMessageRequest);
//Do it again
DoSometing(index,queueURL)
}
}
我得到了非常不可预测的结果。它永远不会完成所有任务。它在完成所有内容之前退出。
我在这里做错了什么?
较短的代码:
static Int32 TimesToLoop = 143;
static void Main(string[] args)
{
Int32 numberOfProcessors = Environment.ProcessorCount;
Parallel.For(0, numberOfProcessors, index => DoSomething(index));
Console.Read();
}
private async static Task DoSomething(int index)
{
if(TimesToLoop == 0)
{
return;
}
Console.WriteLine(index);
Interlocked.Decrement(ref TimesToLoop);
DoSomething(index++);
return;
}
我目前看到各种问题:
-
Parallel.For
才刚刚开始任务。它不会等待他们完成。它将等待DoSomething
方法调用返回,但它们返回的是表示异步操作的任务,这些操作可能不会同步完成。 - 正如CarbineCoder所指出的,你的递归几乎肯定是有缺陷的。目前尚不清楚您要实现的目标,但您需要重新考虑这一方面。
- 无论如何,您的递归都不会
await
返回的任务 - 几乎可以肯定。它可能希望创建在foreach
循环中创建的所有任务的集合,并一次性等待所有任务,或者它可能希望立即await
它们。我们说不出来。
修复第一部分的最简单方法可能是使用 Task.WaitAll
而不是 Parallel.For
:
var tasks = Enumerable.Range(0, numberOfProcessors)
.Select(index => DoSomething(index, sqsQueueURL))
.ToList();
Task.WaitAll(tasks);
与Task.WhenAll
不同,Task.WaitAll
将阻塞,直到所有指定的任务完成。请注意,如果任何任务需要在线程调用WaitAll
上继续,这样做是不安全的,正是因为它阻塞了 - 但如果这是一个控制台应用程序并且您从初始线程调用它,你会没事的,因为无论如何,延续都会在线程池上执行。
private async static Task DoSomething(int index, string queueURL)
{
...
foreach (var msg in receiveMessageResponse.Messages)
{
...
//Do it again
DoSometing(index,queueURL)
}
}
您正在递归调用DoSomething
,并且没有条件中断/返回它。这可能会导致堆栈溢出并终止程序。