以非阻塞方式调用 TaskCompletionSource.SetResult

本文关键字:调用 TaskCompletionSource SetResult 方式 | 更新日期: 2023-09-27 18:36:19

>我发现TaskCompletionSource.SetResult();在返回之前调用等待任务的代码。就我而言,这会导致死锁。

这是一个简化版本,以普通Thread启动

void ReceiverRun()
    while (true)
    {
        var msg = ReadNextMessage();
        TaskCompletionSource<Response> task = requests[msg.RequestID];
        if(msg.Error == null)
            task.SetResult(msg);
        else
            task.SetException(new Exception(msg.Error));
    }
}

代码的"异步"部分如下所示。

await SendAwaitResponse("first message");
SendAwaitResponse("second message").Wait();

等待实际上嵌套在非异步调用中。

发送等待响应(简化)

public static Task<Response> SendAwaitResponse(string msg)
{
    var t = new TaskCompletionSource<Response>();
    requests.Add(GetID(msg), t);
    stream.Write(msg);
    return t.Task;
}

我的假设是,第二个SendAwaitResponse将在ThreadPool线程中执行,但它在为ReceiverRun创建的线程中继续执行。

无论如何,是否可以在不继续其等待的代码的情况下设置任务的结果?

该应用程序是控制台应用程序

以非阻塞方式调用 TaskCompletionSource.SetResult

我发现 TaskCompletionSource.SetResult(); 在返回之前调用等待任务的代码。就我而言,这会导致死锁。

是的,我有一篇博客文章记录了这一点(AFAIK 它没有记录在 MSDN 上)。死锁的发生是由于两件事:

  1. 混合了async和阻塞代码(即,async方法正在调用Wait)。
  2. 使用 TaskContinuationOptions.ExecuteSynchronously 计划任务继续。

我建议从最简单的解决方案开始:删除第一件事(1)。 即,不要混合asyncWait调用:

await SendAwaitResponse("first message");
SendAwaitResponse("second message").Wait();

相反,请始终如一地使用await

await SendAwaitResponse("first message");
await SendAwaitResponse("second message");

如果需要,可以在调用堆栈上更靠前的替代点Wait而不是async 方法中)。

这是我最推荐的解决方案。但是,如果您想尝试删除第二件事 (2),您可以做几个技巧:将SetResult包装在Task.Run中以强制将其强制到单独的线程上(我的 AsyncEx 库有*WithBackgroundContinuations扩展方法可以做到这一点),或者为您的线程提供一个实际的上下文(例如我的AsyncContext类型)并指定ConfigureAwait(false), 这将导致延续忽略ExecuteSynchronously标志。

但这些解决方案比仅仅分离async和阻止代码要复杂得多。

作为旁注,看看TPL Dataflow;听起来你可能会发现它很有用。

由于应用是控制台应用,因此它在默认同步上下文上运行,其中await继续回调将在等待任务完成的同一线程上调用。如果要在await SendAwaitResponse后切换线程,可以使用await Task.Yield()

await SendAwaitResponse("first message");
await Task.Yield(); 
// will be continued on a pool thread
// ...
SendAwaitResponse("second message").Wait(); // so no deadlock

您可以通过将Thread.CurrentThread.ManagedThreadId存储在Task.Result中并将其与await后的当前线程 id 进行比较来进一步改进这一点。如果您仍在同一个线程上,请await Task.Yield() .

虽然我知道SendAwaitResponse是实际代码的简化版本,但它在内部仍然是完全同步的(您在问题中展示的方式)。为什么你会期望那里有任何线程开关?

无论如何,您可能应该重新设计您的逻辑,使其不会假设您当前所在的线程。避免混合awaitTask.Wait(),并使所有代码异步。通常,可以只坚持使用顶层某处(例如Main内部)的一个Wait()

[编辑]ReceiverRun调用task.SetResult(msg)实际上会将控制流传输到您在taskawait的位置 - 由于默认同步上下文的行为,没有线程开关。因此,执行实际消息处理的代码正在接管ReceiverRun线程。最终,SendAwaitResponse("second message").Wait()在同一线程上被调用,从而导致死锁。

下面是以示例为模型的控制台应用代码。它使用 ProcessAsync 内部await Task.Yield()在单独的线程上计划继续,因此控制流返回到 ReceiverRun并且没有死锁。

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApplication
{
    class Program
    {
        class Worker
        {
            public struct Response
            {
                public string message;
                public int threadId;
            }
            CancellationToken _token;
            readonly ConcurrentQueue<string> _messages = new ConcurrentQueue<string>();
            readonly ConcurrentDictionary<string, TaskCompletionSource<Response>> _requests = new ConcurrentDictionary<string, TaskCompletionSource<Response>>();
            public Worker(CancellationToken token)
            {
                _token = token;
            }
            string ReadNextMessage()
            {
                // using Thread.Sleep(100) for test purposes here,
                // should be using ManualResetEvent (or similar synchronization primitive),
                // depending on how messages arrive
                string message;
                while (!_messages.TryDequeue(out message))
                {
                    Thread.Sleep(100);
                    _token.ThrowIfCancellationRequested();
                }
                return message;
            }
            public void ReceiverRun()
            {
                LogThread("Enter ReceiverRun");
                while (true)
                {
                    var msg = ReadNextMessage();
                    LogThread("ReadNextMessage: " + msg);
                    var tcs = _requests[msg];
                    tcs.SetResult(new Response { message = msg, threadId = Thread.CurrentThread.ManagedThreadId });
                    _token.ThrowIfCancellationRequested(); // this is how we terminate the loop
                }
            }
            Task<Response> SendAwaitResponse(string msg)
            {
                LogThread("SendAwaitResponse: " + msg);
                var tcs = new TaskCompletionSource<Response>();
                _requests.TryAdd(msg, tcs);
                _messages.Enqueue(msg);
                return tcs.Task;
            }
            public async Task ProcessAsync()
            {
                LogThread("Enter Worker.ProcessAsync");
                var task1 = SendAwaitResponse("first message");
                await task1;
                LogThread("result1: " + task1.Result.message);
                // avoid deadlock for task2.Wait() with Task.Yield()
                // comment this out and task2.Wait() will dead-lock
                if (task1.Result.threadId == Thread.CurrentThread.ManagedThreadId)
                    await Task.Yield();
                var task2 = SendAwaitResponse("second message");
                task2.Wait();
                LogThread("result2: " + task2.Result.message);
                var task3 = SendAwaitResponse("third message");
                // still on the same thread as with result 2, no deadlock for task3.Wait()
                task3.Wait();
                LogThread("result3: " + task3.Result.message);
                var task4 = SendAwaitResponse("fourth message");
                await task4;
                LogThread("result4: " + task4.Result.message);
                // avoid deadlock for task5.Wait() with Task.Yield()
                // comment this out and task5.Wait() will dead-lock
                if (task4.Result.threadId == Thread.CurrentThread.ManagedThreadId)
                    await Task.Yield();
                var task5 = SendAwaitResponse("fifth message");
                task5.Wait();
                LogThread("result5: " + task5.Result.message);
                LogThread("Leave Worker.ProcessAsync");
            }
            public static void LogThread(string message)
            {
                Console.WriteLine("{0}, thread: {1}", message, Thread.CurrentThread.ManagedThreadId);
            }
        }
        static void Main(string[] args)
        {
            Worker.LogThread("Enter Main");
            var cts = new CancellationTokenSource(5000); // cancel after 5s
            var worker = new Worker(cts.Token);
            Task receiver = Task.Run(() => worker.ReceiverRun());
            Task main = worker.ProcessAsync();
            try
            {
                Task.WaitAll(main, receiver);
            }
            catch (Exception e)
            {
                Console.WriteLine("Exception: " + e.Message);
            }
            Worker.LogThread("Leave Main");
            Console.ReadLine();
        }
    }
}

这与在ReceiverRun内部执行Task.Run(() => task.SetResult(msg))没有太大区别。我能想到的唯一优点是您可以显式控制何时切换线程。这样,您可以尽可能长时间地保持在同一线程上(例如,对于task2task3task4,但您仍然需要在task4后再进行线程切换以避免task5.Wait()上的死锁)。

这两种解决方案最终都会使线程池增长,这在性能和可伸缩性方面很糟糕。

现在,如果我们在上面的代码中ProcessAsync内部各处用await task替换task.Wait(),我们将不必使用await Task.Yield并且仍然不会有死锁。但是,在 ProcessAsync 内部的第一个await task1之后的整个await调用链实际上将在ReceiverRun线程上执行。只要我们不用其他 Wait() 样式的调用阻塞此线程,并且在处理消息时不执行大量 CPU 密集型工作,此方法就可以正常工作(异步 IO 绑定await样式调用仍然应该正常,它们实际上可能会触发隐式线程切换)。

也就是说,我认为您需要一个单独的线程,上面安装了序列化同步上下文来处理消息(类似于WindowsFormsSynchronizationContext)。这是包含awaits的异步代码应运行的位置。您仍然需要避免在该线程上使用Task.Wait。如果单个消息处理需要大量 CPU 密集型工作,则应使用 Task.Run 来完成此类工作。对于异步 IO 绑定调用,可以保持在同一线程上。

你可能想看看@StephenCleary的ActionDispatcher/ActionDispatcherSynchronizationContextNito 异步库,用于异步消息处理逻辑。希望斯蒂芬能跳进来,提供更好的答案。

"我的假设是第二个SendAwaitResponse将在ThreadPool线程中执行,但它在为ReceiverRun创建的线程中继续执行。

这完全取决于您在 SendAwaitResponse 中执行的操作。 异步和并发不是一回事。

退房:C# 5 异步/等待 - 是*并发*吗?

派对有点晚了,但这是我的解决方案,我认为这是附加值。

我也一直在为此苦苦挣扎,我已经通过在等待的方法上捕获同步上下文来解决它。

它看起来像这样:

// just a default sync context
private readonly SynchronizationContext _defaultContext = new SynchronizationContext();
void ReceiverRun()
{
    while (true)    // <-- i would replace this with a cancellation token
    {
        var msg = ReadNextMessage();
        TaskWithContext<TResult> task = requests[msg.RequestID];
        // if it wasn't a winforms/wpf thread, it would be null
        // we choose our default context (threadpool)
        var context = task.Context ?? _defaultContext;
        // execute it on the context which was captured where it was added. So it won't get completed on this thread.
        context.Post(state =>
        {
            if (msg.Error == null)
                task.TaskCompletionSource.SetResult(msg);
            else
                task.TaskCompletionSource.SetException(new Exception(msg.Error));
        });
    }
}
public static Task<Response> SendAwaitResponse(string msg)
{
    // The key is here! Save the current synchronization context.
    var t = new TaskWithContext<Response>(SynchronizationContext.Current); 
    requests.Add(GetID(msg), t);
    stream.Write(msg);
    return t.TaskCompletionSource.Task;
}
// class to hold a task and context
public class TaskWithContext<TResult>
{
    public SynchronizationContext Context { get; }
    public TaskCompletionSource<TResult> TaskCompletionSource { get; } = new TaskCompletionSource<Response>();
    public TaskWithContext(SynchronizationContext context)
    {
        Context = context;
    }
}