在TryReceiveAll之后,OutputAvailableAsync的BufferBlock死锁
本文关键字:BufferBlock 死锁 OutputAvailableAsync TryReceiveAll 之后 | 更新日期: 2023-09-27 18:18:19
在回答这个问题时,我写了下面的代码片段:
var buffer = new BufferBlock<object>();
var producer = Task.Run(async () =>
{
while (true)
{
await Task.Delay(TimeSpan.FromMilliseconds(100));
buffer.Post(null);
Console.WriteLine("Post " + buffer.Count);
}
});
var consumer = Task.Run(async () =>
{
while (await buffer.OutputAvailableAsync())
{
IList<object> items;
buffer.TryReceiveAll(out items);
Console.WriteLine("TryReceiveAll " + buffer.Count);
}
});
await Task.WhenAll(consumer, producer);
生产者应该每隔100毫秒向缓冲区发送物品,消费者应该从缓冲区中清除所有物品,并异步等待更多物品出现。
实际发生的情况是,生产者清除了所有物品一次,然后再也没有移动到OutputAvailableAsync
之外。如果我切换消费者一个接一个地删除项目,它的工作例外:
while (await buffer.OutputAvailableAsync())
{
object item;
while (buffer.TryReceive(out item)) ;
}
我误解了什么吗?如果不是,问题是什么?
这是BufferBlock
内部使用SourceCore
的错误。它的TryReceiveAll
方法不打开_enableOffering
布尔数据成员,而TryReceive
打开。这将导致从OutputAvailableAsync
返回的任务永远不会完成。
这是一个最小的复制:
var buffer = new BufferBlock<object>();
buffer.Post(null);
IList<object> items;
buffer.TryReceiveAll(out items);
var outputAvailableAsync = buffer.OutputAvailableAsync();
buffer.Post(null);
await outputAvailableAsync; // Never completes
我刚刚在。net核心存储库中修复了这个pull请求。
可惜,现在已经是2015年9月底了,虽然i3arnon修复了这个错误,但在修复错误两天后发布的版本中没有解决:Microsoft TPL Dataflow版本4.5.24。
然而IReceivableSourceBlock.TryReceive(…)工作正常。一种扩展方法可以解决这个问题。在TPL Dataflow的新版本发布后,将很容易更改扩展方法。
/// <summary>
/// This extension method returns all available items in the IReceivableSourceBlock
/// or an empty sequence if nothing is available. The functin does not wait.
/// </summary>
/// <typeparam name="T">The type of items stored in the IReceivableSourceBlock</typeparam>
/// <param name="buffer">the source where the items should be extracted from </param>
/// <returns>The IList with the received items. Empty if no items were available</returns>
public static IList<T> TryReceiveAllEx<T>(this IReceivableSourceBlock<T> buffer)
{
/* Microsoft TPL Dataflow version 4.5.24 contains a bug in TryReceiveAll
* Hence this function uses TryReceive until nothing is available anymore
* */
IList<T> receivedItems = new List<T>();
T receivedItem = default(T);
while (buffer.TryReceive<T>(out receivedItem))
{
receivedItems.Add(receivedItem);
}
return receivedItems;
}
用法:
while (await this.bufferBlock.OutputAvailableAsync())
{
// some data available
var receivedItems = this.bufferBlock.TryReceiveAllEx();
if (receivedItems.Any())
{
ProcessReceivedItems(bufferBlock);
}
}