并发集合和唯一元素

本文关键字:元素 唯一 集合 并发 | 更新日期: 2023-09-27 18:10:22

我有一个具有重复元素的并发BlockingCollection。如何修改它来添加或获得不同的元素?

并发集合和唯一元素

BlockingCollection的默认备份存储为ConcurrentQueue。正如其他人指出的那样,用它来添加不同的项是相当困难的。

但是,您可以创建实现IProducerConsumerCollection的自己的集合类型,并将其传递给BlockingCollection构造函数。

想象一个ConcurrentDictionary,它包含当前在队列中的项的键。要添加项目,首先在字典上调用TryAdd,如果项目不在字典中,则添加它,并将其添加到队列中。Take(和TryTake)从队列中获取下一项,将其从字典中移除,然后返回。

我更喜欢如果有一个并发的HashTable,但因为没有一个,你将不得不与ConcurrentDictionary

下面是一个具有队列行为的IProducerConsumerCollection<T>集合的实现,它也拒绝重复项:

public class ConcurrentQueueNoDuplicates<T> : IProducerConsumerCollection<T>
{
    private readonly Queue<T> _queue = new();
    private readonly HashSet<T> _set;
    private object Locker => _queue;
    public ConcurrentQueueNoDuplicates(IEqualityComparer<T> comparer = default)
    {
        _set = new(comparer);
    }
    public bool TryAdd(T item)
    {
        lock (Locker)
        {
            if (!_set.Add(item))
                throw new DuplicateKeyException();
            _queue.Enqueue(item); return true;
        }
    }
    public bool TryTake(out T item)
    {
        lock (Locker)
        {
            if (_queue.Count == 0)
                throw new InvalidOperationException();
            item = _queue.Dequeue();
            bool removed = _set.Remove(item);
            Debug.Assert(removed);
            return true;
        }
    }
    public int Count { get { lock (Locker) return _queue.Count; } }
    public bool IsSynchronized => false;
    public object SyncRoot => throw new NotSupportedException();
    public T[] ToArray() { lock (Locker) return _queue.ToArray(); }
    public IEnumerator<T> GetEnumerator() => ToArray().AsEnumerable().GetEnumerator();
    IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
    public void CopyTo(T[] array, int index) => throw new NotSupportedException();
    public void CopyTo(Array array, int index) => throw new NotSupportedException();
}
public class DuplicateKeyException : InvalidOperationException { }

使用例子:

BlockingCollection<Item> queue = new(new ConcurrentQueueNoDuplicates<Item>());
//...
try { queue.Add(item); }
catch (DuplicateKeyException) { Console.WriteLine($"The {item} was rejected."); }

警告:调用queue.TryAdd(item);没有预期的行为,如果项目是重复的返回false。任何添加重复项的尝试都会导致DuplicateKeyException。不要试图"修复"。通过返回false来实现上述ConcurrentQueueNoDuplicates<T>.TryAddTryTakeBlockingCollection<T>的反应是抛出一个不同的异常(InvalidOperationException),在此之上,它的内部状态将被破坏。目前有……. NET 7)一个bug,它减少了BlockingCollection<T>的有效容量,其底层存储具有返回falseTryAdd实现。. net 8已经修复了这个错误,它将防止损坏,但不会改变抛出错误的行为。