如何让多个线程等待单个任务

本文关键字:等待 单个 任务 线程 | 更新日期: 2023-09-27 18:31:40

我已经读过这个:从多个线程等待相同的任务可以吗 - await 线程安全吗? 而且我对答案不清楚,所以这里有一个特定的用例。

我有一个执行一些异步网络 I/O 的方法。 多个线程可以一次命中此方法,我不想将它们全部调用网络请求,如果请求已经在进行中,我想阻止/等待 2nd+ 线程,并在单个 IO 操作完成后让它们全部恢复。

应该如何编写以下别名代码?我猜每个调用线程确实需要获得自己的Task,因此每个都可以获得自己的延续,因此我应该返回一个新的Task,而不是返回currentTask,该由来自DoAsyncNetworkIO的"内部"Task完成。有没有一种干净的方法来做到这一点,还是我必须用手滚动它?

static object mutex = new object();
static Task currentTask;
async Task Fetch()
{
    lock(mutex)
    {
        if(currentTask != null)
            return currentTask;
    }
    currentTask = DoAsyncNetworkIO();
    await currentTask;
    lock(mutex)
    {
        var task = currentTask;
        currentTask = null;
        return task;
    }
}

如何让多个线程等待单个任务

您可以使用SemaphoreSlim来确保只有一个线程实际执行后台线程。

假设您的基本任务(实际执行 IO 的任务)在一个名为 baseTask() 的方法中,我将像这样模拟它:

static async Task baseTask()
{
    Console.WriteLine("Starting long method.");
    await Task.Delay(1000);
    Console.WriteLine("Finished long method.");
}

然后你可以像这样初始化一个SemaphoreSlim,就像初始状态设置为 trueAutoResetEvent

static readonly SemaphoreSlim signal = new SemaphoreSlim(1, 1);

然后将对baseTask()的调用包装在一个方法中,该方法检查signal以查看这是否是尝试运行baseTask()的第一个线程,如下所示:

static async Task<bool> taskWrapper()
{
    bool firstIn = await signal.WaitAsync(0);
    if (firstIn)
    {
        await baseTask();
        signal.Release();
    }
    else
    {
        await signal.WaitAsync();
        signal.Release();
    }
    return firstIn;
}

然后,您的多个线程将等待taskWrapper(),而不是直接等待baseTask()

将其完全放在可编译的控制台应用程序中:

using System;
using System.Threading;
using System.Threading.Tasks;
namespace Demo
{
    static class Program
    {
        static void Main()
        {
            for (int it = 0; it < 10; ++it)
            {
                Console.WriteLine($"'nStarting iteration {it}");
                Task[] tasks = new Task[5];
                for (int i = 0; i < 5; ++i)
                    tasks[i] = Task.Run(demoTask);
                Task.WaitAll(tasks);
            }
            Console.WriteLine("'nFinished");                  
            Console.ReadLine();
        }
        static async Task demoTask()
        {
            int id = Thread.CurrentThread.ManagedThreadId;
            Console.WriteLine($"Thread {id} starting");
            bool firstIn = await taskWrapper();
            Console.WriteLine($"Task {id}: executed: {firstIn}");
        }
        static async Task<bool> taskWrapper()
        {
            bool firstIn = await signal.WaitAsync(0);
            if (firstIn)
            {
                await baseTask();
                signal.Release();
            }
            else
            {
                await signal.WaitAsync();
                signal.Release();
            }
            return firstIn;
        }
        static async Task baseTask()
        {
            Console.WriteLine("Starting long method.");
            await Task.Delay(1000);
            Console.WriteLine("Finished long method.");
        }
        static readonly SemaphoreSlim signal = new SemaphoreSlim(1, 1);
    }
}

(这些方法都是静态的,因为它们位于控制台应用中;在实际代码中,它们将是非静态方法。

await根本不

需要使用延续(Task.ContinueWith类型)。即使这样做,您也可以在一个Task上有多个延续 - 它们不能全部同步运行(如果您有同步上下文,您可能会遇到一些问题)。

请注意,您的伪代码不是线程安全的 - 您不能只在锁之外执行currentTask = DoAsyncNetworkIO();。只有await本身是线程安全的,即使这样,也只是因为您正在等待的Task类以线程安全的方式实现了await协定。任何人都可以写自己的等待者/等待者,所以一定要注意:)