Nito.AsyncEx异步生产者/消费者队列没有处理
本文关键字:队列 处理 消费者 AsyncEx 异步 生产者 Nito | 更新日期: 2023-09-27 18:16:41
我正在尝试创建一个类似于"消息泵"的自定义同步上下文,该上下文在线程上运行。
程序是Silverlight 5,同步队列来自Nito。AsyncEx nuget (v3.0.1) by Stephen Cleary.
代码(很抱歉,注释/debug故意包含在内):
public sealed class ThreadSynchronizationContext : SynchronizationContext, IDisposable
{
/// <summary>The queue of work items.</summary>
private readonly AsyncProducerConsumerQueue<KeyValuePair<SendOrPostCallback, object>> syncQueue =
new AsyncProducerConsumerQueue<KeyValuePair<SendOrPostCallback, object>>();
private readonly Thread thread = null;
public ThreadSynchronizationContext()
{
Debug.WriteLine("------------------------");
Debug.WriteLine("thread " + Thread.CurrentThread.ManagedThreadId + " - starting worker thread sync context!");
Debug.WriteLine("------------------------");
// using this hack so the new thread will start running before this function returns
using (var hack = new ManualResetEvent(false))
{
thread = new Thread(async obj =>
{
SetSynchronizationContext(obj as SynchronizationContext);
hack.Set();
try
{
Debug.WriteLine("thread " + Thread.CurrentThread.ManagedThreadId + " - awaiting queue available...");
while (await syncQueue.OutputAvailableAsync())
{
Debug.WriteLine("awaiting queue item...");
var workItem = await syncQueue.TryDequeueAsync();
Debug.WriteLine("thread " + Thread.CurrentThread.ManagedThreadId + " - queue item received!");
if (workItem.Success)
{
workItem.Item.Key(workItem.Item.Value);
}
}
Debug.WriteLine("thread " + Thread.CurrentThread.ManagedThreadId + " - queue finished :(");
}
catch (ObjectDisposedException e)
{
Debug.WriteLine("thread " + Thread.CurrentThread.ManagedThreadId + " - queue exception :((");
}
});
thread.Start(this);
hack.WaitOne();
Debug.WriteLine("worker thread: " + WorkerThreadId);
}
}
public int WorkerThreadId { get { return thread.ManagedThreadId; } }
public void Dispose()
{
syncQueue.Dispose();
}
/// <summary>Dispatches an asynchronous message to the synchronization context.</summary>
/// <param name="d">The System.Threading.SendOrPostCallback delegate to call.</param>
/// <param name="state">The object passed to the delegate.</param>
public async override void Post(SendOrPostCallback d, object state)
{
if (d == null) throw new ArgumentNullException("d");
Debug.WriteLine("thread " + Thread.CurrentThread.ManagedThreadId + " - enqueuing item...");
await syncQueue.EnqueueAsync(new KeyValuePair<SendOrPostCallback, object>(d, state));
Debug.WriteLine("thread " + Thread.CurrentThread.ManagedThreadId + " - item enqueued.");
}
/// <summary>Dispatches a synchronous message to the synchronization context.</summary>
/// <param name="d">The System.Threading.SendOrPostCallback delegate to call.</param>
/// <param name="state">The object passed to the delegate.</param>
public override void Send(SendOrPostCallback d, object state)
{
if (d == null) throw new ArgumentNullException("d");
using (var handledEvent = new ManualResetEvent(false))
{
Post(SendOrPostCallback_BlockingWrapper, Tuple.Create(d, state, handledEvent));
Debug.WriteLine("thread " + Thread.CurrentThread.ManagedThreadId + " - waiting for blocking wrapper!");
handledEvent.WaitOne();
Debug.WriteLine("thread " + Thread.CurrentThread.ManagedThreadId + " - blocking wrapper finished.");
}
}
private static void SendOrPostCallback_BlockingWrapper(object state)
{
var innerCallback = (state as Tuple<SendOrPostCallback, object, ManualResetEvent>);
try
{
Debug.WriteLine("thread " + Thread.CurrentThread.ManagedThreadId + " - call callback from blocking wrapper...");
innerCallback.Item1(innerCallback.Item2);
Debug.WriteLine("thread " + Thread.CurrentThread.ManagedThreadId + " - blocking wrapper callback finished.");
}
finally
{
Debug.WriteLine("thread " + Thread.CurrentThread.ManagedThreadId + " - setting handle from blocking wrapper!");
innerCallback.Item3.Set();
}
}
}
问题:
当我启动应用程序,并向上下文发送一些委托时,输出如下:
------------------------
thread 1 - starting worker thread sync context!
------------------------
thread 17 - awaiting queue available...
worker thread: 17
thread 1 - enqueuing item...
thread 8 - enqueuing item...
thread 8 - item enqueued.
thread 1 - item enqueued.
thread 1 - waiting for blocking wrapper!
基本上,程序冻结在Send()
方法的handledEvent.WaitOne();
行,就好像队列从未开始处理添加的项目。
我有点难住了,感谢您的指导。
这里的问题有点棘手,但有一个很好的线索,你会看到你的"enqueueing item…"调试输出两次当你只调用Send
一次
实际发生的是自定义同步上下文被线程的主委托中的await
拾取。因此,它将尝试将其队列处理代码排队到自己的队列中。
分解:
- 线程委托开始执行并到达
await syncQueue.OutputAvailableAsync()
行。 此时,线程委托向当前同步上下文( - 当主叫代码调用
Send
时,将一个项目加入队列,使OutputAvailableAsync
完成。 - 线程委托然后尝试继续执行
Post
到捕获的ThreadSynchronizationContext
。
ThreadSynchronizationContext
实例)注册其延续,然后返回(导致线程退出)。如果你想要一个单线程的同步上下文,那么你根本不应该使用异步线程委托。相反,只需使用同步api:
thread = new Thread(obj =>
{
SetSynchronizationContext(obj as SynchronizationContext);
hack.Set();
try
{
Debug.WriteLine("thread " + Thread.CurrentThread.ManagedThreadId + " - awaiting queue available...");
while (true)
{
Debug.WriteLine("awaiting queue item...");
var workItem = syncQueue.TryDequeue();
Debug.WriteLine("thread " + Thread.CurrentThread.ManagedThreadId + " - queue item received!");
if (!workItem.Success)
break;
workItem.Item.Key(workItem.Item.Value);
}
Debug.WriteLine("thread " + Thread.CurrentThread.ManagedThreadId + " - queue finished :(");
}
catch (ObjectDisposedException e)
{
Debug.WriteLine("thread " + Thread.CurrentThread.ManagedThreadId + " - queue exception :((");
}
});
事实上,我建议完全避免async void
,所以我建议将Post
也设置为同步方法(它仍然是"异步"的,因为它没有立即执行其SendOrPostCallback
委托;它是同步的排队):
public override void Post(SendOrPostCallback d, object state)
{
if (d == null) throw new ArgumentNullException("d");
Debug.WriteLine("thread " + Thread.CurrentThread.ManagedThreadId + " - enqueuing item...");
syncQueue.Enqueue(new KeyValuePair<SendOrPostCallback, object>(d, state));
Debug.WriteLine("thread " + Thread.CurrentThread.ManagedThreadId + " - item enqueued.");
}
或者,您可以省去所有这些痛苦,只使用已经是AsyncEx的一部分的AsyncContextThread
类型。AsyncContextThread
在内部使用自己的单线程同步上下文