同步并行任务的正确方法

本文关键字:方法 并行任务 同步 | 更新日期: 2023-09-27 18:00:30

目前我们有这个代码可以正常工作:

Result result1 = null;
Result result2 = null;
var task1 = Task.Factory.StartNew(()=>
{
   var records = DB.Read("..");
   //Do A lot
   result1 = Process(records);  
}); 
var task2 = Task.Factory.StartNew(()=>
{
   var records = DB.Read(".....");
   //Do A lot
   result2 = Process(records);  
});
Task.WaitAll(task1, task2);
var result = Combine(result1, result2);

现在我们想使用数据库函数的异步对应项,我们正在使用这种新模式:

Result result1 = null;
Result result2 = null;
var task1 = await Task.Factory.StartNew( async ()=>
{
   var records = await DB.ReadAsync("..");
   //Do A lot
   result1 = Process(records);  
}); 
var task2 = await Task.Factory.StartNew(async ()=>
{
   var records = await DB.ReadAsync(".....");
   //Do A lot
   result2 = Process(records);  
});
Task.WaitAll(task1, task2);
var result = Combine(result1, result2);

切换到异步后,我们开始观察到异常行为。所以我想知道这是否是并行异步调用的正确模式?

同步并行任务的正确方法

Task.Factory.StartNew是一个预异步API。您应该使用Task.Run,它在设计时考虑了 async-await:

var task1 = await Task.Run( async ()=>
{
   var records = await DB.ReadAsync("..");
   //Do A lot
   result1 = Process(records);  
});

问题是异步 lambda 返回一个Task因此Task.Factory.StartNew返回一个Task<Task>(外部Task.Factory.StartNew因为返回Task,而内部 lambda 是异步 lambda 的结果(。

这意味着当您等待task1task2时,您实际上并不是在等待整个操作,而只是等待它的同步部分。

您可以通过在返回的Task<Task>上使用Task.Unwrap来解决此问题:

Task<Task> task1 = await Task.Factory.StartNew(async ()=>
{
   var records = await DB.ReadAsync("..");
   //Do A lot
   result1 = Process(records);  
});
Task actualTask1 = task1.Unwrap();
await actualTask1;

Task.Run含蓄地为你这样做。


作为旁注,您应该意识到不需要Task.Run来并发执行这些操作。您只需调用这些方法并与Task.When一起等待结果即可:

async Task MainAsync()
{
    var task1 = FooAsync();
    var task2 = BarAsync();
    await Task.WhenAll(task1, task2);
    var result = Combine(task1.Result, task2.Result);
}
async Task<Result> FooAsync()
{
    var records = await DB.ReadAsync("..");
    //Do A lot
    return Process(records);  
}
async Task<Result> BarAsync()
{
    var records = await DB.ReadAsync(".....");
    //Do A lot
    return Process(records);
}

仅当您需要将这些方法的同步部分(第一个await之前的部分(卸载到ThreadPool时,您只需要Task.Run

使用得很好。WaitAll 不是异步编程,因为您实际上在等待时阻止当前线程。你也不打电话.解开包装,这就是为什么你只等待异步 lambda 的创建,而不是异步 lambda 本身。

Task.Run 可以为您解开异步 lambda 的包装。但是有一种更简单、更清洁的方法。

var task1 = DB.ReadAsync("..").ContinueWith(task => {
   //Do A lot
   return Process(task.Result);  
}, TaskScheduler.Default);
var task2 = DB.ReadAsync("..").ContinueWith(task => {
   //Do A lot
   return Process(task.Result);  
}, TaskScheduler.Default);
var result = Combine(await task1, await task2);

这样,您将在准备就绪时获得结果。因此,您根本不需要额外的任务和变量。

请注意,ContinueWith 是一个棘手的函数,如果它不为 null,它适用于 TaskScheduler.Current ,否则它适用于线程池调度程序 TaskScheduler.Default。因此,在调用此函数时始终显式指定调度程序更安全。

同样为了澄清,我没有包括错误检查,因为实际上是数据库。读取异步可以完成,但会出现错误。但这是一件容易的事情,你可以自己处理。

Task.Factory.StartNew启动一个新的任务,执行另一个独立的执行单元。因此,最简单的处理方法可能如下所示:

var task1 = Task.Factory.StartNew(()=> //NO AWAIT
{
   var records = DB.Read("....."); //NO ASYNC
   //Do A lot
   result1 = Process(records);  
});
... another task definition
Task.WaitAll(task1, task2);

在一个任务中按顺序读取和处理,因为您具有数据依赖关系。