Nito.AsyncEx异步生产者/消费者队列没有处理

本文关键字:队列 处理 消费者 AsyncEx 异步 生产者 Nito | 更新日期: 2023-09-27 18:16:41

我正在尝试创建一个类似于"消息泵"的自定义同步上下文,该上下文在线程上运行。

程序是Silverlight 5,同步队列来自Nito。AsyncEx nuget (v3.0.1) by Stephen Cleary.

代码(很抱歉,注释/debug故意包含在内):

public sealed class ThreadSynchronizationContext : SynchronizationContext, IDisposable
{
    /// <summary>The queue of work items.</summary>
    private readonly AsyncProducerConsumerQueue<KeyValuePair<SendOrPostCallback, object>> syncQueue =
        new AsyncProducerConsumerQueue<KeyValuePair<SendOrPostCallback, object>>();
    private readonly Thread thread = null;
    public ThreadSynchronizationContext()
    {
        Debug.WriteLine("------------------------");
        Debug.WriteLine("thread " + Thread.CurrentThread.ManagedThreadId + " - starting worker thread sync context!");
        Debug.WriteLine("------------------------");
        // using this hack so the new thread will start running before this function returns
        using (var hack = new ManualResetEvent(false))
        {
            thread = new Thread(async obj =>
            {
                SetSynchronizationContext(obj as SynchronizationContext);
                hack.Set();
                try
                {
                    Debug.WriteLine("thread " + Thread.CurrentThread.ManagedThreadId + " - awaiting queue available...");
                    while (await syncQueue.OutputAvailableAsync())
                    {
                        Debug.WriteLine("awaiting queue item...");
                        var workItem = await syncQueue.TryDequeueAsync();
                        Debug.WriteLine("thread " + Thread.CurrentThread.ManagedThreadId + " - queue item received!");
                        if (workItem.Success)
                        {
                            workItem.Item.Key(workItem.Item.Value);
                        }
                    }
                    Debug.WriteLine("thread " + Thread.CurrentThread.ManagedThreadId + " - queue finished :(");
                }
                catch (ObjectDisposedException e)
                {
                    Debug.WriteLine("thread " + Thread.CurrentThread.ManagedThreadId + " - queue exception :((");
                }
            });
            thread.Start(this);
            hack.WaitOne();
            Debug.WriteLine("worker thread: " + WorkerThreadId);
        }
    }
    public int WorkerThreadId { get { return thread.ManagedThreadId; } }
    public void Dispose()
    {
        syncQueue.Dispose();
    }
    /// <summary>Dispatches an asynchronous message to the synchronization context.</summary>
    /// <param name="d">The System.Threading.SendOrPostCallback delegate to call.</param>
    /// <param name="state">The object passed to the delegate.</param>
    public async override void Post(SendOrPostCallback d, object state)
    {
        if (d == null) throw new ArgumentNullException("d");
        Debug.WriteLine("thread " + Thread.CurrentThread.ManagedThreadId + " - enqueuing item...");
        await syncQueue.EnqueueAsync(new KeyValuePair<SendOrPostCallback, object>(d, state));
        Debug.WriteLine("thread " + Thread.CurrentThread.ManagedThreadId + " - item enqueued.");
    }
    /// <summary>Dispatches a synchronous message to the synchronization context.</summary>
    /// <param name="d">The System.Threading.SendOrPostCallback delegate to call.</param>
    /// <param name="state">The object passed to the delegate.</param>
    public override void Send(SendOrPostCallback d, object state)
    {
        if (d == null) throw new ArgumentNullException("d");
        using (var handledEvent = new ManualResetEvent(false))
        {
            Post(SendOrPostCallback_BlockingWrapper, Tuple.Create(d, state, handledEvent));
            Debug.WriteLine("thread " + Thread.CurrentThread.ManagedThreadId + " - waiting for blocking wrapper!");
            handledEvent.WaitOne();
            Debug.WriteLine("thread " + Thread.CurrentThread.ManagedThreadId + " - blocking wrapper finished.");
        }
    }
    private static void SendOrPostCallback_BlockingWrapper(object state)
    {
        var innerCallback = (state as Tuple<SendOrPostCallback, object, ManualResetEvent>);
        try
        {
            Debug.WriteLine("thread " + Thread.CurrentThread.ManagedThreadId + " - call callback from blocking wrapper...");
            innerCallback.Item1(innerCallback.Item2);
            Debug.WriteLine("thread " + Thread.CurrentThread.ManagedThreadId + " - blocking wrapper callback finished.");
        }
        finally
        {
            Debug.WriteLine("thread " + Thread.CurrentThread.ManagedThreadId + " - setting handle from blocking wrapper!");
            innerCallback.Item3.Set();
        }
    }
}

问题:

当我启动应用程序,并向上下文发送一些委托时,输出如下:

------------------------
thread 1 - starting worker thread sync context!
------------------------
thread 17 - awaiting queue available...
worker thread: 17
thread 1 - enqueuing item...
thread 8 - enqueuing item...
thread 8 - item enqueued.
thread 1 - item enqueued.
thread 1 - waiting for blocking wrapper!

基本上,程序冻结在Send()方法的handledEvent.WaitOne();行,就好像队列从未开始处理添加的项目。

我有点难住了,感谢您的指导。

Nito.AsyncEx异步生产者/消费者队列没有处理

这里的问题有点棘手,但有一个很好的线索,你会看到你的"enqueueing item…"调试输出两次当你只调用Send 一次

实际发生的是自定义同步上下文被线程的主委托中的await拾取。因此,它将尝试将其队列处理代码排队到自己的队列中。

分解:

  • 线程委托开始执行并到达await syncQueue.OutputAvailableAsync()行。
  • 此时,线程委托向当前同步上下文(ThreadSynchronizationContext实例)注册其延续,然后返回(导致线程退出)。
  • 当主叫代码调用Send时,将一个项目加入队列,使OutputAvailableAsync完成。
  • 线程委托然后尝试继续执行Post到捕获的ThreadSynchronizationContext

如果你想要一个单线程的同步上下文,那么你根本不应该使用异步线程委托。相反,只需使用同步api:

thread = new Thread(obj =>
{
    SetSynchronizationContext(obj as SynchronizationContext);
    hack.Set();
    try
    {
        Debug.WriteLine("thread " + Thread.CurrentThread.ManagedThreadId + " - awaiting queue available...");
        while (true)
        {
            Debug.WriteLine("awaiting queue item...");
            var workItem = syncQueue.TryDequeue();
            Debug.WriteLine("thread " + Thread.CurrentThread.ManagedThreadId + " - queue item received!");
            if (!workItem.Success)
                break;
            workItem.Item.Key(workItem.Item.Value);
        }
        Debug.WriteLine("thread " + Thread.CurrentThread.ManagedThreadId + " - queue finished :(");
    }
    catch (ObjectDisposedException e)
    {
        Debug.WriteLine("thread " + Thread.CurrentThread.ManagedThreadId + " - queue exception :((");
    }
});

事实上,我建议完全避免async void,所以我建议将Post也设置为同步方法(它仍然是"异步"的,因为它没有立即执行其SendOrPostCallback委托;它是同步的排队):

public override void Post(SendOrPostCallback d, object state)
{
    if (d == null) throw new ArgumentNullException("d");
    Debug.WriteLine("thread " + Thread.CurrentThread.ManagedThreadId + " - enqueuing item...");
    syncQueue.Enqueue(new KeyValuePair<SendOrPostCallback, object>(d, state));
    Debug.WriteLine("thread " + Thread.CurrentThread.ManagedThreadId + " - item enqueued.");
}

或者,您可以省去所有这些痛苦,只使用已经是AsyncEx的一部分的AsyncContextThread类型。AsyncContextThread在内部使用自己的单线程同步上下文