BlockingCollection(T).GetConsumingEnumerable()如何抛出OperationC
本文关键字:何抛出 OperationC GetConsumingEnumerable BlockingCollection | 更新日期: 2023-09-27 17:57:46
我正在使用BlockingCollection来实现任务调度程序,基本上是:
public class DedicatedThreadScheduler : TaskScheduler, IDisposable
{
readonly BlockingCollection<Task> m_taskQueue = new BlockingCollection<Task>();
readonly Thread m_thread;
public DedicatedThreadScheduler()
{
m_thread = new Thread(() =>
{
foreach (var task in m_taskQueue.GetConsumingEnumerable())
{
TryExecuteTask(task);
}
m_taskQueue.Dispose();
});
m_thread.Start();
}
public void Dispose()
{
m_taskQueue.CompleteAdding();
}
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
return Thread.CurrentThread == m_thread && TryExecuteTask(task);
}
(...)
}
我只见过一次,无法重现,但在foreach上的某个时刻(在TryTakeWithNoTimeValidation中),我收到了OperationCanceledException。我不明白,因为我使用的重载不接受CancellationToken,并且文档指出它可能只抛出ObjectDisposedException。例外情况意味着什么?阻止收集已经完成?队列中的任务被取消了?
更新:调用堆栈如下:
mscorlib.dll!System.Threading.SemaphoreSlim.WaitUntilCountOrTimeout(int millisecondsTimeout, uint startTime, System.Threading.CancellationToken cancellationToken) + 0x36 bytes
mscorlib.dll!System.Threading.SemaphoreSlim.Wait(int millisecondsTimeout, System.Threading.CancellationToken cancellationToken) + 0x178 bytes
System.dll!System.Collections.Concurrent.BlockingCollection<System.Threading.Tasks.Task>.TryTakeWithNoTimeValidation(out System.Threading.Tasks.Task item, int millisecondsTimeout, System.Threading.CancellationToken cancellationToken, System.Threading.CancellationTokenSource combinedTokenSource) Line 710 + 0x25 bytes C#
System.dll!System.Collections.Concurrent.BlockingCollection<System.Threading.Tasks.Task>.GetConsumingEnumerable(System.Threading.CancellationToken cancellationToken) Line 1677 + 0x18 bytes C#
这是一个老问题,但我会为将来找到它的人添加完整的答案。尤金提供的答案部分正确;此时,您必须使用Visual Studio进行调试,并将其配置为中断已处理的框架异常。
然而,您破坏OperationCanceledException
的实际原因是BlockingCollection<T>.CompleteAdding()
的代码如下所示:
public void CompleteAdding()
{
int num;
this.CheckDisposed();
if (this.IsAddingCompleted)
{
return;
}
SpinWait wait = new SpinWait();
Label_0017:
num = this.m_currentAdders;
if ((num & -2147483648) != 0)
{
wait.Reset();
while (this.m_currentAdders != -2147483648)
{
wait.SpinOnce();
}
}
else if (Interlocked.CompareExchange(ref this.m_currentAdders, num | -2147483648, num) == num)
{
wait.Reset();
while (this.m_currentAdders != -2147483648)
{
wait.SpinOnce();
}
if (this.Count == 0)
{
this.CancelWaitingConsumers();
}
this.CancelWaitingProducers();
}
else
{
wait.SpinOnce();
goto Label_0017;
}
}
注意这些特定的行:
if (this.Count == 0)
{
this.CancelWaitingConsumers();
}
称这种方法为
private void CancelWaitingConsumers()
{
this.m_ConsumersCancellationTokenSource.Cancel();
}
因此,即使您没有在代码中显式使用CancellationToken
,如果调用CompleteAdding()
时BlockingCollection
为空,那么底层框架代码也会抛出OperationCanceledException
。它这样做是为了向GetConsumingEnumerable()
方法发出退出的信号。异常是由框架代码处理的,如果你没有将调试器配置为拦截它,你就不会注意到它
无法复制它的原因是您在Dispose()
方法中放置了对CompleteAdding()
的调用。因此,它是在GC的突发奇想下被调用的。
我只能推测,但我认为您可能正在体验Stephen Toub在他的"task.Wait and"inlining"博客文章中以及Jon Skeet在这里描述的任务内联场景。
您的TaskScheduler.TryExecuteTaskInline
实现是什么样子的?为了防止意外的任务内联,请始终返回false
:
override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
return false;
}
该异常偶尔会在GetConsumingEnumerable枚举器的MoveNext()方法内发生,但它是一个已处理的异常,因此通常您不应该看到它。
也许您已将调试器配置为中断已处理的异常(在Visual Studio中,这些选项位于"调试/异常"菜单中),在这种情况下,调试器可能会中断.NET框架函数内部发生的异常。