以非阻塞方式调用 TaskCompletionSource.SetResult
本文关键字:调用 TaskCompletionSource SetResult 方式 | 更新日期: 2023-09-27 18:36:19
>我发现TaskCompletionSource.SetResult();
在返回之前调用等待任务的代码。就我而言,这会导致死锁。
这是一个简化版本,以普通Thread
启动
void ReceiverRun()
while (true)
{
var msg = ReadNextMessage();
TaskCompletionSource<Response> task = requests[msg.RequestID];
if(msg.Error == null)
task.SetResult(msg);
else
task.SetException(new Exception(msg.Error));
}
}
代码的"异步"部分如下所示。
await SendAwaitResponse("first message");
SendAwaitResponse("second message").Wait();
等待实际上嵌套在非异步调用中。
发送等待响应(简化)
public static Task<Response> SendAwaitResponse(string msg)
{
var t = new TaskCompletionSource<Response>();
requests.Add(GetID(msg), t);
stream.Write(msg);
return t.Task;
}
我的假设是,第二个SendAwaitResponse将在ThreadPool线程中执行,但它在为ReceiverRun创建的线程中继续执行。
无论如何,是否可以在不继续其等待的代码的情况下设置任务的结果?
该应用程序是控制台应用程序。
我发现 TaskCompletionSource.SetResult(); 在返回之前调用等待任务的代码。就我而言,这会导致死锁。
是的,我有一篇博客文章记录了这一点(AFAIK 它没有记录在 MSDN 上)。死锁的发生是由于两件事:
- 混合了
async
和阻塞代码(即,async
方法正在调用Wait
)。 - 使用
TaskContinuationOptions.ExecuteSynchronously
计划任务继续。
我建议从最简单的解决方案开始:删除第一件事(1)。 即,不要混合async
和Wait
调用:
await SendAwaitResponse("first message");
SendAwaitResponse("second message").Wait();
相反,请始终如一地使用await
:
await SendAwaitResponse("first message");
await SendAwaitResponse("second message");
如果需要,可以在调用堆栈上更靠前的替代点Wait
(而不是在 async
方法中)。
这是我最推荐的解决方案。但是,如果您想尝试删除第二件事 (2),您可以做几个技巧:将SetResult
包装在Task.Run
中以强制将其强制到单独的线程上(我的 AsyncEx 库有*WithBackgroundContinuations
扩展方法可以做到这一点),或者为您的线程提供一个实际的上下文(例如我的AsyncContext
类型)并指定ConfigureAwait(false)
, 这将导致延续忽略ExecuteSynchronously
标志。
但这些解决方案比仅仅分离async
和阻止代码要复杂得多。
作为旁注,看看TPL Dataflow;听起来你可能会发现它很有用。
由于应用是控制台应用,因此它在默认同步上下文上运行,其中await
继续回调将在等待任务完成的同一线程上调用。如果要在await SendAwaitResponse
后切换线程,可以使用await Task.Yield()
await SendAwaitResponse("first message");
await Task.Yield();
// will be continued on a pool thread
// ...
SendAwaitResponse("second message").Wait(); // so no deadlock
您可以通过将Thread.CurrentThread.ManagedThreadId
存储在Task.Result
中并将其与await
后的当前线程 id 进行比较来进一步改进这一点。如果您仍在同一个线程上,请await Task.Yield()
.
虽然我知道SendAwaitResponse
是实际代码的简化版本,但它在内部仍然是完全同步的(您在问题中展示的方式)。为什么你会期望那里有任何线程开关?
无论如何,您可能应该重新设计您的逻辑,使其不会假设您当前所在的线程。避免混合await
和Task.Wait()
,并使所有代码异步。通常,可以只坚持使用顶层某处(例如Main
内部)的一个Wait()
。
[编辑] 从ReceiverRun
调用task.SetResult(msg)
实际上会将控制流传输到您在task
上await
的位置 - 由于默认同步上下文的行为,没有线程开关。因此,执行实际消息处理的代码正在接管ReceiverRun
线程。最终,SendAwaitResponse("second message").Wait()
在同一线程上被调用,从而导致死锁。
下面是以示例为模型的控制台应用代码。它使用 ProcessAsync
内部await Task.Yield()
在单独的线程上计划继续,因此控制流返回到 ReceiverRun
并且没有死锁。
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApplication
{
class Program
{
class Worker
{
public struct Response
{
public string message;
public int threadId;
}
CancellationToken _token;
readonly ConcurrentQueue<string> _messages = new ConcurrentQueue<string>();
readonly ConcurrentDictionary<string, TaskCompletionSource<Response>> _requests = new ConcurrentDictionary<string, TaskCompletionSource<Response>>();
public Worker(CancellationToken token)
{
_token = token;
}
string ReadNextMessage()
{
// using Thread.Sleep(100) for test purposes here,
// should be using ManualResetEvent (or similar synchronization primitive),
// depending on how messages arrive
string message;
while (!_messages.TryDequeue(out message))
{
Thread.Sleep(100);
_token.ThrowIfCancellationRequested();
}
return message;
}
public void ReceiverRun()
{
LogThread("Enter ReceiverRun");
while (true)
{
var msg = ReadNextMessage();
LogThread("ReadNextMessage: " + msg);
var tcs = _requests[msg];
tcs.SetResult(new Response { message = msg, threadId = Thread.CurrentThread.ManagedThreadId });
_token.ThrowIfCancellationRequested(); // this is how we terminate the loop
}
}
Task<Response> SendAwaitResponse(string msg)
{
LogThread("SendAwaitResponse: " + msg);
var tcs = new TaskCompletionSource<Response>();
_requests.TryAdd(msg, tcs);
_messages.Enqueue(msg);
return tcs.Task;
}
public async Task ProcessAsync()
{
LogThread("Enter Worker.ProcessAsync");
var task1 = SendAwaitResponse("first message");
await task1;
LogThread("result1: " + task1.Result.message);
// avoid deadlock for task2.Wait() with Task.Yield()
// comment this out and task2.Wait() will dead-lock
if (task1.Result.threadId == Thread.CurrentThread.ManagedThreadId)
await Task.Yield();
var task2 = SendAwaitResponse("second message");
task2.Wait();
LogThread("result2: " + task2.Result.message);
var task3 = SendAwaitResponse("third message");
// still on the same thread as with result 2, no deadlock for task3.Wait()
task3.Wait();
LogThread("result3: " + task3.Result.message);
var task4 = SendAwaitResponse("fourth message");
await task4;
LogThread("result4: " + task4.Result.message);
// avoid deadlock for task5.Wait() with Task.Yield()
// comment this out and task5.Wait() will dead-lock
if (task4.Result.threadId == Thread.CurrentThread.ManagedThreadId)
await Task.Yield();
var task5 = SendAwaitResponse("fifth message");
task5.Wait();
LogThread("result5: " + task5.Result.message);
LogThread("Leave Worker.ProcessAsync");
}
public static void LogThread(string message)
{
Console.WriteLine("{0}, thread: {1}", message, Thread.CurrentThread.ManagedThreadId);
}
}
static void Main(string[] args)
{
Worker.LogThread("Enter Main");
var cts = new CancellationTokenSource(5000); // cancel after 5s
var worker = new Worker(cts.Token);
Task receiver = Task.Run(() => worker.ReceiverRun());
Task main = worker.ProcessAsync();
try
{
Task.WaitAll(main, receiver);
}
catch (Exception e)
{
Console.WriteLine("Exception: " + e.Message);
}
Worker.LogThread("Leave Main");
Console.ReadLine();
}
}
}
这与在ReceiverRun
内部执行Task.Run(() => task.SetResult(msg))
没有太大区别。我能想到的唯一优点是您可以显式控制何时切换线程。这样,您可以尽可能长时间地保持在同一线程上(例如,对于task2
、task3
、task4
,但您仍然需要在task4
后再进行线程切换以避免task5.Wait()
上的死锁)。
这两种解决方案最终都会使线程池增长,这在性能和可伸缩性方面很糟糕。
现在,如果我们在上面的代码中ProcessAsync
内部各处用await task
替换task.Wait()
,我们将不必使用await Task.Yield
并且仍然不会有死锁。但是,在 ProcessAsync
内部的第一个await task1
之后的整个await
调用链实际上将在ReceiverRun
线程上执行。只要我们不用其他 Wait()
样式的调用阻塞此线程,并且在处理消息时不执行大量 CPU 密集型工作,此方法就可以正常工作(异步 IO 绑定await
样式调用仍然应该正常,它们实际上可能会触发隐式线程切换)。
也就是说,我认为您需要一个单独的线程,上面安装了序列化同步上下文来处理消息(类似于WindowsFormsSynchronizationContext
)。这是包含awaits
的异步代码应运行的位置。您仍然需要避免在该线程上使用Task.Wait
。如果单个消息处理需要大量 CPU 密集型工作,则应使用 Task.Run
来完成此类工作。对于异步 IO 绑定调用,可以保持在同一线程上。
你可能想看看@StephenCleary的ActionDispatcher
/ActionDispatcherSynchronizationContext
Nito 异步库,用于异步消息处理逻辑。希望斯蒂芬能跳进来,提供更好的答案。
"我的假设是第二个SendAwaitResponse将在ThreadPool线程中执行,但它在为ReceiverRun创建的线程中继续执行。
这完全取决于您在 SendAwaitResponse 中执行的操作。 异步和并发不是一回事。
退房:C# 5 异步/等待 - 是*并发*吗?
派对有点晚了,但这是我的解决方案,我认为这是附加值。
我也一直在为此苦苦挣扎,我已经通过在等待的方法上捕获同步上下文来解决它。
它看起来像这样:
// just a default sync context
private readonly SynchronizationContext _defaultContext = new SynchronizationContext();
void ReceiverRun()
{
while (true) // <-- i would replace this with a cancellation token
{
var msg = ReadNextMessage();
TaskWithContext<TResult> task = requests[msg.RequestID];
// if it wasn't a winforms/wpf thread, it would be null
// we choose our default context (threadpool)
var context = task.Context ?? _defaultContext;
// execute it on the context which was captured where it was added. So it won't get completed on this thread.
context.Post(state =>
{
if (msg.Error == null)
task.TaskCompletionSource.SetResult(msg);
else
task.TaskCompletionSource.SetException(new Exception(msg.Error));
});
}
}
public static Task<Response> SendAwaitResponse(string msg)
{
// The key is here! Save the current synchronization context.
var t = new TaskWithContext<Response>(SynchronizationContext.Current);
requests.Add(GetID(msg), t);
stream.Write(msg);
return t.TaskCompletionSource.Task;
}
// class to hold a task and context
public class TaskWithContext<TResult>
{
public SynchronizationContext Context { get; }
public TaskCompletionSource<TResult> TaskCompletionSource { get; } = new TaskCompletionSource<Response>();
public TaskWithContext(SynchronizationContext context)
{
Context = context;
}
}