在现有的IProducerConsumerCollection(T)上使用BlockingCollection(T)

本文关键字:BlockingCollection IProducerConsumerCollection | 更新日期: 2023-09-27 18:30:53

我正在从一个接口公开一个IProducerConsumerCollection(T),该接口将定期添加项目供另一个线程使用。

public interface IProducer<T>
{
    IProducerConsumerCollection<T> ProducerCollection { get; }
}

我试图对现有集合使用BlockingCollection(T),但似乎不支持直接添加到IProducerConsumerCollection(T)

var queue = new ConcurrentQueue<string>();
var blockingCollection = new BlockingCollection<string>(queue);
var task1 = Task.Run(() => {
    Console.WriteLine("Dequeued " + blockingCollection.Take());
    Console.WriteLine("Dequeued " + blockingCollection.Take());
});
var task2 = Task.Run(() => {
    Console.WriteLine("Enqueueing Hello");
    queue.Enqueue("Hello");
    Console.WriteLine("Enqueueing World");
    queue.Enqueue("World");
});
Task.WaitAll(task1, task2);

这将无限期挂起,因为BlockingCollection(T)不会注意到新项目。

是否有与BlockingCollection(T).Take方法类似的功能,或者是否有比以下方法更简单的功能:

static async Task<T> TakeAsync<T>(
    IProducerConsumerCollection<T> collection,
    CancellationToken token
)
{
    T result;
    while(!collection.TryTake(out result))
    {
        await Task.Yield();
        token.ThrowIfCancellationRequested();
    }
    return result;
}

在现有的IProducerConsumerCollection(T)上使用BlockingCollection(T)

这将无限期挂起,因为 BlockingCollection(T) 不会注意到新项目。

事实上 - 因为你直接添加到queue.从文档中:

不要直接修改基础集合。使用 BlockingCollection<T> 方法添加或删除元素。如果直接更改基础集合,则BlockingCollection<T>对象可能会损坏。

因此,您对queue.Enqueue的调用应改用blockingCollection

就您的TaskAsync而言 - 您可以使用超时为 Infinite (-1) 的TryTake代替......但你可能也想阅读斯蒂芬·克利里关于类似主题的博客文章......

相关文章:
  • 没有找到相关文章