如何将 ConcurrentDictionary 包装在 BlockingCollection 中

本文关键字:BlockingCollection 包装 ConcurrentDictionary | 更新日期: 2023-09-27 18:35:16




BlockingCollection<int> bag = new BlockingCollection<int>(new ConcurrentBag<int>());


如何将 ConcurrentDictionary 包装在 BlockingCollection 中


        ConcurrentDictionary<int, BlockingCollection<string>> mailBoxes = new ConcurrentDictionary<int, BlockingCollection<string>>();
        int maxBoxes = 5;
        CancellationTokenSource cancelationTokenSource = new CancellationTokenSource();
        CancellationToken cancelationToken = cancelationTokenSource.Token;
        Random rnd = new Random();
        // Producer
        Task.Factory.StartNew(() =>
            while (true)
                int index = rnd.Next(0, maxBoxes);
                // put the letter in the mailbox 'index'
                var box = mailBoxes.GetOrAdd(index, new BlockingCollection<string>());
                box.Add("some message " + index, cancelationToken);
                Console.WriteLine("Produced a letter to put in box " + index);
                // Wait simulating a heavy production item.
        // Consumer 1
        Task.Factory.StartNew(() =>
            while (true)
                int index = rnd.Next(0, maxBoxes);
                // get the letter in the mailbox 'index'
                var box = mailBoxes.GetOrAdd(index, new BlockingCollection<string>());
                var message = box.Take(cancelationToken);
                Console.WriteLine("Consumed 1: " + message);
                // consume a item cost less than produce it:
        // Consumer 2
        Task.Factory.StartNew(() =>
            while (true)
                int index = rnd.Next(0, maxBoxes);
                // get the letter in the mailbox 'index'
                var box = mailBoxes.GetOrAdd(index, new BlockingCollection<string>());
                var message = box.Take(cancelationToken);
                Console.WriteLine("Consumed 2: " + message);
                // consume a item cost less than produce it:

通过这种方式,期望在邮箱 5 中收到某些内容的消费者将等到产品者将一封信放入邮箱 5。

您需要编写自己的适配器类 - 如下所示:

public class ConcurrentDictionaryWrapper<TKey,TValue>
    : IProducerConsumerCollection<KeyValuePair<TKey,TValue>>
    private ConcurrentDictionary<TKey, TValue> dictionary;
    public IEnumerator<KeyValuePair<TKey, TValue>> GetEnumerator()
        return dictionary.GetEnumerator();
    IEnumerator IEnumerable.GetEnumerator()
        return GetEnumerator();
    public void CopyTo(Array array, int index)
        throw new NotImplementedException();
    public int Count
        get { return dictionary.Count; }
    public object SyncRoot
        get { return this; }
    public bool IsSynchronized
        get { return true; }
    public void CopyTo(KeyValuePair<TKey, TValue>[] array, int index)
        throw new NotImplementedException();
    public bool TryAdd(KeyValuePair<TKey, TValue> item)
        return dictionary.TryAdd(item.Key, item.Value);
    public bool TryTake(out KeyValuePair<TKey, TValue> item)
        item = dictionary.FirstOrDefault();
        TValue value;
        return dictionary.TryRemove(item.Key, out value);
    public KeyValuePair<TKey, TValue>[] ToArray()
        throw new NotImplementedException();

这是一个由 ConcurrentDictionary<TKey, TValue> 支持的 IProducerConsumerCollection<T> 集合的实现。集合的T类型为 KeyValuePair<TKey, TValue> 。它与 Nick Jones 的实现非常相似,有一些改进:

public class ConcurrentDictionaryProducerConsumer<TKey, TValue>
    : IProducerConsumerCollection<KeyValuePair<TKey, TValue>>
    private readonly ConcurrentDictionary<TKey, TValue> _dictionary;
    private readonly ThreadLocal<IEnumerator<KeyValuePair<TKey, TValue>>> _enumerator;
    public ConcurrentDictionaryProducerConsumer(
        IEqualityComparer<TKey> comparer = default)
        _dictionary = new(comparer);
        _enumerator = new(() => _dictionary.GetEnumerator());
    public bool TryAdd(KeyValuePair<TKey, TValue> entry)
        if (!_dictionary.TryAdd(entry.Key, entry.Value))
            throw new DuplicateKeyException();
        return true;
    public bool TryTake(out KeyValuePair<TKey, TValue> entry)
        // Get a cached enumerator that is used only by the current thread.
        IEnumerator<KeyValuePair<TKey, TValue>> enumerator = _enumerator.Value;
        while (true)
            if (!enumerator.MoveNext())
                throw new InvalidOperationException();
            entry = enumerator.Current;
            if (!_dictionary.TryRemove(entry)) continue;
            return true;
    public int Count => _dictionary.Count;
    public bool IsSynchronized => false;
    public object SyncRoot => throw new NotSupportedException();
    public KeyValuePair<TKey, TValue>[] ToArray() => _dictionary.ToArray();
    public IEnumerator<KeyValuePair<TKey, TValue>> GetEnumerator()
        => _dictionary.GetEnumerator();
    IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
    public void CopyTo(KeyValuePair<TKey, TValue>[] array, int index)
        => throw new NotSupportedException();
    public void CopyTo(Array array, int index) => throw new NotSupportedException();
public class DuplicateKeyException : InvalidOperationException { }


BlockingCollection<KeyValuePair<string, Item>> collection
    = new(new ConcurrentDictionaryProducerConsumer<string, Item>());
try { collection.Add(KeyValuePair.Create(key, item)); }
catch (DuplicateKeyException) { Console.WriteLine($"The {key} was rejected."); }


警告: 如果密钥存在,则调用collection.TryAdd(item);不会具有返回false的预期行为。任何添加重复键的尝试都必然会导致DuplicateKeyException 。有关解释,请查看上述其他帖子。

  • 没有找到相关文章