如何让多个线程等待单个任务
本文关键字:等待 单个 任务 线程 | 更新日期: 2023-09-27 18:31:40
我已经读过这个:从多个线程等待相同的任务可以吗 - await 线程安全吗? 而且我对答案不清楚,所以这里有一个特定的用例。
我有一个执行一些异步网络 I/O 的方法。 多个线程可以一次命中此方法,我不想将它们全部调用网络请求,如果请求已经在进行中,我想阻止/等待 2nd+ 线程,并在单个 IO 操作完成后让它们全部恢复。
我应该如何编写以下别名代码?我猜每个调用线程确实需要获得自己的Task
,因此每个都可以获得自己的延续,因此我应该返回一个新的Task
,而不是返回currentTask
,该由来自DoAsyncNetworkIO
的"内部"Task
完成。有没有一种干净的方法来做到这一点,还是我必须用手滚动它?
static object mutex = new object();
static Task currentTask;
async Task Fetch()
{
lock(mutex)
{
if(currentTask != null)
return currentTask;
}
currentTask = DoAsyncNetworkIO();
await currentTask;
lock(mutex)
{
var task = currentTask;
currentTask = null;
return task;
}
}
您可以使用SemaphoreSlim
来确保只有一个线程实际执行后台线程。
假设您的基本任务(实际执行 IO 的任务)在一个名为 baseTask()
的方法中,我将像这样模拟它:
static async Task baseTask()
{
Console.WriteLine("Starting long method.");
await Task.Delay(1000);
Console.WriteLine("Finished long method.");
}
然后你可以像这样初始化一个SemaphoreSlim
,就像初始状态设置为 true
的AutoResetEvent
:
static readonly SemaphoreSlim signal = new SemaphoreSlim(1, 1);
然后将对baseTask()
的调用包装在一个方法中,该方法检查signal
以查看这是否是尝试运行baseTask()
的第一个线程,如下所示:
static async Task<bool> taskWrapper()
{
bool firstIn = await signal.WaitAsync(0);
if (firstIn)
{
await baseTask();
signal.Release();
}
else
{
await signal.WaitAsync();
signal.Release();
}
return firstIn;
}
然后,您的多个线程将等待taskWrapper()
,而不是直接等待baseTask()
。
将其完全放在可编译的控制台应用程序中:
using System;
using System.Threading;
using System.Threading.Tasks;
namespace Demo
{
static class Program
{
static void Main()
{
for (int it = 0; it < 10; ++it)
{
Console.WriteLine($"'nStarting iteration {it}");
Task[] tasks = new Task[5];
for (int i = 0; i < 5; ++i)
tasks[i] = Task.Run(demoTask);
Task.WaitAll(tasks);
}
Console.WriteLine("'nFinished");
Console.ReadLine();
}
static async Task demoTask()
{
int id = Thread.CurrentThread.ManagedThreadId;
Console.WriteLine($"Thread {id} starting");
bool firstIn = await taskWrapper();
Console.WriteLine($"Task {id}: executed: {firstIn}");
}
static async Task<bool> taskWrapper()
{
bool firstIn = await signal.WaitAsync(0);
if (firstIn)
{
await baseTask();
signal.Release();
}
else
{
await signal.WaitAsync();
signal.Release();
}
return firstIn;
}
static async Task baseTask()
{
Console.WriteLine("Starting long method.");
await Task.Delay(1000);
Console.WriteLine("Finished long method.");
}
static readonly SemaphoreSlim signal = new SemaphoreSlim(1, 1);
}
}
(这些方法都是静态的,因为它们位于控制台应用中;在实际代码中,它们将是非静态方法。
await
根本不需要使用延续(Task.ContinueWith
类型)。即使这样做,您也可以在一个Task
上有多个延续 - 它们不能全部同步运行(如果您有同步上下文,您可能会遇到一些问题)。
请注意,您的伪代码不是线程安全的 - 您不能只在锁之外执行currentTask = DoAsyncNetworkIO();
。只有await
本身是线程安全的,即使这样,也只是因为您正在等待的Task
类以线程安全的方式实现了await
协定。任何人都可以写自己的等待者/等待者,所以一定要注意:)