在TryReceiveAll之后,OutputAvailableAsync的BufferBlock死锁

本文关键字:BufferBlock 死锁 OutputAvailableAsync TryReceiveAll 之后 | 更新日期: 2023-09-27 18:18:19

在回答这个问题时,我写了下面的代码片段:

var buffer = new BufferBlock<object>();
var producer = Task.Run(async () =>
{
    while (true)
    {
        await Task.Delay(TimeSpan.FromMilliseconds(100));
        buffer.Post(null);
        Console.WriteLine("Post " + buffer.Count);
    }
});
var consumer = Task.Run(async () =>
{
    while (await buffer.OutputAvailableAsync())
    {
        IList<object> items;
        buffer.TryReceiveAll(out items);
        Console.WriteLine("TryReceiveAll " + buffer.Count);
    }
});
await Task.WhenAll(consumer, producer);

生产者应该每隔100毫秒向缓冲区发送物品,消费者应该从缓冲区中清除所有物品,并异步等待更多物品出现。

实际发生的情况是,生产者清除了所有物品一次,然后再也没有移动到OutputAvailableAsync之外。如果我切换消费者一个接一个地删除项目,它的工作例外:

while (await buffer.OutputAvailableAsync())
{
    object item;
    while (buffer.TryReceive(out item)) ;
}

我误解了什么吗?如果不是,问题是什么?

在TryReceiveAll之后,OutputAvailableAsync的BufferBlock死锁

这是BufferBlock内部使用SourceCore的错误。它的TryReceiveAll方法不打开_enableOffering布尔数据成员,而TryReceive打开。这将导致从OutputAvailableAsync返回的任务永远不会完成。

这是一个最小的复制:

var buffer = new BufferBlock<object>();
buffer.Post(null);
IList<object> items;
buffer.TryReceiveAll(out items);
var outputAvailableAsync = buffer.OutputAvailableAsync();
buffer.Post(null);
await outputAvailableAsync; // Never completes

我刚刚在。net核心存储库中修复了这个pull请求。

可惜,现在已经是2015年9月底了,虽然i3arnon修复了这个错误,但在修复错误两天后发布的版本中没有解决:Microsoft TPL Dataflow版本4.5.24。

然而IReceivableSourceBlock.TryReceive(…)工作正常。一种扩展方法可以解决这个问题。在TPL Dataflow的新版本发布后,将很容易更改扩展方法。

/// <summary>
/// This extension method returns all available items in the IReceivableSourceBlock
/// or an empty sequence if nothing is available. The functin does not wait.
/// </summary>
/// <typeparam name="T">The type of items stored in the IReceivableSourceBlock</typeparam>
/// <param name="buffer">the source where the items should be extracted from </param>
/// <returns>The IList with the received items. Empty if no items were available</returns>
public static IList<T> TryReceiveAllEx<T>(this IReceivableSourceBlock<T> buffer)
{
    /* Microsoft TPL Dataflow version 4.5.24 contains a bug in TryReceiveAll
     * Hence this function uses TryReceive until nothing is available anymore
     * */
    IList<T> receivedItems = new List<T>();
    T receivedItem = default(T);
    while (buffer.TryReceive<T>(out receivedItem))
    {
        receivedItems.Add(receivedItem);
    }
    return receivedItems;
}

用法:

while (await this.bufferBlock.OutputAvailableAsync())
{
    // some data available
    var receivedItems = this.bufferBlock.TryReceiveAllEx();
    if (receivedItems.Any())
    {
        ProcessReceivedItems(bufferBlock);
    }
}