从同步上下文调用异步方法

本文关键字:异步方法 调用 上下文 同步 | 更新日期: 2023-09-27 17:57:54

我在代码中通过HTTP调用服务(最终使用HttpClient.SendAsync方法)。然后从WebAPI控制器操作调用此代码。大多数情况下,它工作得很好(测试通过),但当我在IIS上部署时,我会遇到死锁,因为异步方法调用的调用方已被阻止,并且在完成之前无法在该线程上继续(不会)。

虽然我可以使我的大多数方法异步,但我觉得我对什么时候必须这样做并没有基本的理解。

例如,假设我确实让我的大多数方法都是异步的(因为它们最终会调用其他异步服务方法),如果我构建了一个消息循环,比如说我想要控制并行度,我该如何调用程序的第一个异步方法?

由于HttpClient没有任何同步方法,如果我有一个不支持async的抽象,我可以放心地假设该怎么办?我读过关于ConfigureAwait(false)的文章,但我真的不明白它的作用。它是在异步调用之后设置的,这对我来说很奇怪。对我来说,这感觉就像是一场等待发生的比赛。。。无论多么不可能。。。

WebAPI示例:

public HttpResponseMessage Get()
{
  var userContext = contextService.GetUserContext(); // <-- synchronous
  return ...
}
// Some IUserContextService implementation
public IUserContext GetUserContext()
{
  var httpClient = new HttpClient();
  var result = httpClient.GetAsync(...).Result; // <-- I really don't care if this is asynchronous or not
  return new HttpUserContext(result);
}

消息循环示例:

var mq = new MessageQueue();
// we then run say 8 tasks that do this
for (;;)
{
  var m = mq.Get();
  var c = GetCommand(m);
  c.InvokeAsync().Wait();
  m.Delete();
}

当您有一个允许事情并行发生的消息循环,并且您有异步方法时,就有机会最大限度地减少延迟。基本上,我想在这个例子中实现的是最大限度地减少延迟和空闲时间。尽管我实际上不确定如何调用与从队列中到达的消息相关联的命令。

更具体地说,如果命令调用需要执行服务请求,那么可以用于获取下一条消息的调用中将存在延迟。诸如此类的东西。我完全可以通过将事情打包到队列中并自己协调来实现这一点,但我希望看到这只需要一些异步/等待的东西。

从同步上下文调用异步方法

虽然我可以使我的大多数方法异步,但我觉得我对什么时候必须这样做并没有基本的理解。

从最低级别开始。听起来你已经有了一个开始,但如果你想在最低级别上寻找更多,那么经验法则是任何基于I/O的东西都应该是async(例如HttpClient)。

然后是重复async感染的问题。您想要使用异步方法,所以使用await调用它们。因此该方法必须是async。所以它的所有调用者都必须使用await,所以它们也必须是async,等等。

如果我构建了一个消息循环来控制并行度,那么我该如何调用程序的第一个异步方法呢?

让框架负责这一点是最容易的。例如,您只需从WebAPI操作返回一个Task<T>,框架就可以理解这一点。类似地,UI应用程序内置了一个消息循环,async可以自然地使用它。

如果您遇到框架不理解Task或有内置消息循环(通常是控制台应用程序或Win32服务)的情况,您可以使用我的AsyncEx库中的AsyncContext类型。AsyncContext只是在当前线程上安装一个"主循环"(与async兼容)。

由于HttpClient没有任何同步方法,如果我有一个不支持异步的抽象,我可以放心地假设该怎么办?

正确的方法是改变抽象。不要试图阻止异步代码;我在博客中详细描述了这种常见的死锁情况。

您可以通过使抽象对async友好来更改它。例如,将IUserContext IUserContextService.GetUserContext()更改为Task<IUserContext> IUserContextService.GetUserContextAsync()

我读过ConfigureAwait(false),但我并不真正理解它的作用。它是在异步调用之后设置的,这对我来说很奇怪。

你可能会发现我的async简介很有帮助。在这个答案中,我不会对ConfigureAwait说太多,因为我认为它不直接适用于这个问题的好解决方案(但我并不是说它不好;它实际上应该使用,除非你不能使用它)。

请记住,async是一个具有优先级规则的运算符。一开始感觉很神奇,但实际上并没有那么神奇。此代码:

var result = await httpClient.GetAsync(url).ConfigureAwait(false);

与此代码完全相同:

var asyncOperation = httpClient.GetAsync(url).ConfigureAwait(false);
var result = await asyncOperation;

async代码中通常没有竞争条件,因为即使方法是异步,它也是顺序。该方法可以在await处暂停,并且在await完成之前不会恢复。

当您有一个允许事情并行发生的消息循环,并且您有异步方法时,就有机会最大限度地减少延迟。

这是您第二次提到"并行"的"消息循环",但我认为您真正想要的是让多个(异步)消费者在同一队列中工作,对吗?使用async很容易做到这一点(请注意,在本例中,单个线程上只有一个消息循环;当所有内容都是异步的时,这通常就是您所需要的):

await tasks.WhenAll(ConsumerAsync(), ConsumerAsync(), ConsumerAsync());
async Task ConsumerAsync()
{
  for (;;) // TODO: consider a CancellationToken for orderly shutdown
  {
    var m = await mq.ReceiveAsync();
    var c = GetCommand(m);
    await c.InvokeAsync();
    m.Delete();
  }
}
// Extension method
public static Task<Message> ReceiveAsync(this MessageQueue mq)
{
  return Task<Message>.Factory.FromAsync(mq.BeginReceive, mq.EndReceive, null);
}

您可能也会对TPL数据流感兴趣。Dataflow是一个理解async代码并能很好地使用它的库,并且内置了很好的并行选项。

虽然我很欣赏社区成员的洞察力,但总是很难表达我想要做的事情的意图,但对于获得有关问题周围环境的建议非常有用。有了这些,我终于明白了下面的代码。

public class AsyncOperatingContext
{
  struct Continuation
  {
    private readonly SendOrPostCallback d;
    private readonly object state;
    public Continuation(SendOrPostCallback d, object state)
    {
      this.d = d;
      this.state = state;
    }
    public void Run()
    {
      d(state);
    }
  }
  class BlockingSynchronizationContext : SynchronizationContext
  {
    readonly BlockingCollection<Continuation> _workQueue;
    public BlockingSynchronizationContext(BlockingCollection<Continuation> workQueue)
    {
      _workQueue = workQueue;
    }
    public override void Post(SendOrPostCallback d, object state)
    {
      _workQueue.TryAdd(new Continuation(d, state));
    }
  }
  /// <summary>
  /// Gets the recommended max degree of parallelism. (Your main program message loop could use this value.)
  /// </summary>
  public static int MaxDegreeOfParallelism { get { return Environment.ProcessorCount; } }
  #region Helper methods
  /// <summary>
  /// Run an async task. This method will block execution (and use the calling thread as a worker thread) until the async task has completed.
  /// </summary>
  public static T Run<T>(Func<Task<T>> main, int degreeOfParallelism = 1)
  {
    var asyncOperatingContext = new AsyncOperatingContext();
    asyncOperatingContext.DegreeOfParallelism = degreeOfParallelism;
    return asyncOperatingContext.RunMain(main);
  }
  /// <summary>
  /// Run an async task. This method will block execution (and use the calling thread as a worker thread) until the async task has completed.
  /// </summary>
  public static void Run(Func<Task> main, int degreeOfParallelism = 1)
  {
    var asyncOperatingContext = new AsyncOperatingContext();
    asyncOperatingContext.DegreeOfParallelism = degreeOfParallelism;
    asyncOperatingContext.RunMain(main);
  }
  #endregion
  private readonly BlockingCollection<Continuation> _workQueue;
  public int DegreeOfParallelism { get; set; }
  public AsyncOperatingContext()
  {
    _workQueue = new BlockingCollection<Continuation>();
  }
  /// <summary>
  /// Initialize the current thread's SynchronizationContext so that work is scheduled to run through this AsyncOperatingContext.
  /// </summary>
  protected void InitializeSynchronizationContext()
  {
    SynchronizationContext.SetSynchronizationContext(new BlockingSynchronizationContext(_workQueue));
  }
  protected void RunMessageLoop()
  {
    while (!_workQueue.IsCompleted)
    {
      Continuation continuation;
      if (_workQueue.TryTake(out continuation, Timeout.Infinite))
      {
        continuation.Run();
      }
    }
  }
  protected T RunMain<T>(Func<Task<T>> main)
  {
    var degreeOfParallelism = DegreeOfParallelism;
    if (!((1 <= degreeOfParallelism) & (degreeOfParallelism <= 5000))) // sanity check
    {
      throw new ArgumentOutOfRangeException("DegreeOfParallelism must be between 1 and 5000.", "DegreeOfParallelism");
    }
    var currentSynchronizationContext = SynchronizationContext.Current;
    InitializeSynchronizationContext(); // must set SynchronizationContext before main() task is scheduled
    var mainTask = main(); // schedule "main" task
    mainTask.ContinueWith(task => _workQueue.CompleteAdding());
    // for single threading we don't need worker threads so we don't use any
    // otherwise (for increased parallelism) we simply launch X worker threads
    if (degreeOfParallelism > 1)
    {
      for (int i = 1; i < degreeOfParallelism; i++)
      {
        ThreadPool.QueueUserWorkItem(_ => {
          // do we really need to restore the SynchronizationContext here as well?
          InitializeSynchronizationContext();
          RunMessageLoop();
        });
      }
    }
    RunMessageLoop();
    SynchronizationContext.SetSynchronizationContext(currentSynchronizationContext); // restore
    return mainTask.Result;
  }
  protected void RunMain(Func<Task> main)
  {
    // The return value doesn't matter here
    RunMain(async () => { await main(); return 0; });
  }
}

这门课是完整的,它做了一些我觉得很难掌握的事情。

作为一般建议,您应该允许TAP(基于任务的异步)模式在代码中传播。这可能意味着相当多的重构(或重新设计)。理想情况下,应该允许您将其分解为多个部分,并在努力实现使程序更加异步的总体目标时取得进展。

以同步方式无情地调用异步代码本身就是一件危险的事情。我们的意思是调用WaitResult方法。这可能会导致死锁。解决类似问题的一种方法是使用AsyncOperatingContext.Run方法。它将使用当前线程运行消息循环,直到异步调用完成。它将临时交换出与当前线程相关联的任何SynchronizationContext

注意:我不知道这是否足够,或者是否允许您以这种方式交换回SynchronizationContext,假设可以,这应该可以。我已经被ASP.NET死锁问题所困扰,这可能是一种变通方法。

最后,我发现自己在问一个问题,在async上下文中,Main(string[])的对应等价物是什么?原来这就是消息循环。

我发现有两个东西组成了这个async机器。

CCD_ 36和消息循环。在我的AsyncOperatingContext中,我提供了一个非常简单的消息循环:

protected void RunMessageLoop()
{
  while (!_workQueue.IsCompleted)
  {
    Continuation continuation;
    if (_workQueue.TryTake(out continuation, Timeout.Infinite))
    {
      continuation.Run();
    }
  }
}

我的SynchronizationContext.Post因此变成:

public override void Post(SendOrPostCallback d, object state)
{
  _workQueue.TryAdd(new Continuation(d, state));
}

我们的切入点,基本上相当于来自同步上下文的async main(来自原始源的简化版本):

SynchronizationContext.SetSynchronizationContext(new BlockingSynchronizationContext(_workQueue));
var mainTask = main(); // schedule "main" task
mainTask.ContinueWith(task => _workQueue.CompleteAdding());
RunMessageLoop();
return mainTask.Result;

所有这些都是昂贵的,我们不能用它来替换对async方法的调用,但它确实允许我们快速创建所需的工具,以便在需要的地方继续编写async代码,而不必处理整个程序。从这个实现中也可以清楚地看到工作线程的去向以及它如何影响程序的并发性。

我看到这一点,心想,是的,Node.js就是这么做的。尽管JavaScript没有像C#目前那样出色的异步/等待语言支持。

额外的好处是,我可以完全控制并行度,如果我愿意,我可以运行完全单线程的async任务。但是,如果我这样做并在任何任务上调用WaitResult,它将死锁程序,因为它将阻塞唯一可用的消息循环。