BlockingCollection(T).GetConsumingEnumerable()如何抛出OperationC

本文关键字:何抛出 OperationC GetConsumingEnumerable BlockingCollection | 更新日期: 2023-09-27 17:57:46

我正在使用BlockingCollection来实现任务调度程序,基本上是:

public class DedicatedThreadScheduler : TaskScheduler, IDisposable
{
    readonly BlockingCollection<Task> m_taskQueue = new BlockingCollection<Task>();
    readonly Thread m_thread;

    public DedicatedThreadScheduler()
    {
        m_thread = new Thread(() =>
        {
            foreach (var task in m_taskQueue.GetConsumingEnumerable())
            {
                TryExecuteTask(task);
            }
            m_taskQueue.Dispose();
        });
        m_thread.Start();
    }
    public void Dispose()
    {
        m_taskQueue.CompleteAdding();
    }
    protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
    {
        return Thread.CurrentThread == m_thread && TryExecuteTask(task);
    }
    (...)
}

我只见过一次,无法重现,但在foreach上的某个时刻(在TryTakeWithNoTimeValidation中),我收到了OperationCanceledException。我不明白,因为我使用的重载不接受CancellationToken,并且文档指出它可能只抛出ObjectDisposedException。例外情况意味着什么?阻止收集已经完成?队列中的任务被取消了?

更新:调用堆栈如下:

mscorlib.dll!System.Threading.SemaphoreSlim.WaitUntilCountOrTimeout(int millisecondsTimeout, uint startTime, System.Threading.CancellationToken cancellationToken) + 0x36 bytes 
mscorlib.dll!System.Threading.SemaphoreSlim.Wait(int millisecondsTimeout, System.Threading.CancellationToken cancellationToken) + 0x178 bytes   
System.dll!System.Collections.Concurrent.BlockingCollection<System.Threading.Tasks.Task>.TryTakeWithNoTimeValidation(out System.Threading.Tasks.Task item, int millisecondsTimeout, System.Threading.CancellationToken cancellationToken, System.Threading.CancellationTokenSource combinedTokenSource) Line 710 + 0x25 bytes   C#
System.dll!System.Collections.Concurrent.BlockingCollection<System.Threading.Tasks.Task>.GetConsumingEnumerable(System.Threading.CancellationToken cancellationToken) Line 1677 + 0x18 bytes    C#

BlockingCollection(T).GetConsumingEnumerable()如何抛出OperationC

这是一个老问题,但我会为将来找到它的人添加完整的答案。尤金提供的答案部分正确;此时,您必须使用Visual Studio进行调试,并将其配置为中断已处理的框架异常。

然而,您破坏OperationCanceledException的实际原因是BlockingCollection<T>.CompleteAdding()的代码如下所示:

    public void CompleteAdding()
    {
        int num;
        this.CheckDisposed();
        if (this.IsAddingCompleted)
        {
            return;
        }
        SpinWait wait = new SpinWait();
    Label_0017:
        num = this.m_currentAdders;
        if ((num & -2147483648) != 0)
        {
            wait.Reset();
            while (this.m_currentAdders != -2147483648)
            {
                wait.SpinOnce();
            }
        }
        else if (Interlocked.CompareExchange(ref this.m_currentAdders, num | -2147483648, num) == num)
        {
            wait.Reset();
            while (this.m_currentAdders != -2147483648)
            {
                wait.SpinOnce();
            }
            if (this.Count == 0)
            {
                this.CancelWaitingConsumers();
            }
            this.CancelWaitingProducers();
        }
        else
        {
            wait.SpinOnce();
            goto Label_0017;
        }
    }

注意这些特定的行:

if (this.Count == 0)
{
    this.CancelWaitingConsumers();
}

称这种方法为

private void CancelWaitingConsumers()
{
    this.m_ConsumersCancellationTokenSource.Cancel();
}

因此,即使您没有在代码中显式使用CancellationToken,如果调用CompleteAdding()BlockingCollection为空,那么底层框架代码也会抛出OperationCanceledException。它这样做是为了向GetConsumingEnumerable()方法发出退出的信号。异常是由框架代码处理的,如果你没有将调试器配置为拦截它,你就不会注意到它

无法复制它的原因是您在Dispose()方法中放置了对CompleteAdding()的调用。因此,它是在GC的突发奇想下被调用的。

我只能推测,但我认为您可能正在体验Stephen Toub在他的"task.Wait and"inlining"博客文章中以及Jon Skeet在这里描述的任务内联场景。

您的TaskScheduler.TryExecuteTaskInline实现是什么样子的?为了防止意外的任务内联,请始终返回false:

override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
    return false;
}

该异常偶尔会在GetConsumingEnumerable枚举器的MoveNext()方法内发生,但它是一个已处理的异常,因此通常您不应该看到它。

也许您已将调试器配置为中断已处理的异常(在Visual Studio中,这些选项位于"调试/异常"菜单中),在这种情况下,调试器可能会中断.NET框架函数内部发生的异常。