如何将 ConcurrentDictionary 包装在 BlockingCollection 中

本文关键字:BlockingCollection 包装 ConcurrentDictionary | 更新日期: 2023-09-27 18:35:16

我尝试通过将ConcurrentDictionary包装在BlockingCollection中来实现它,但似乎没有成功。

我知道一个变量声明适用于BlockingCollection,例如ConcurrentBag<T>ConcurrentQueue<T>等。

因此,要创建一个包裹在BlockingCollection中的ConcurrentBag,我将像这样声明和实例化:

BlockingCollection<int> bag = new BlockingCollection<int>(new ConcurrentBag<int>());

但是如何为ConcurrentDictionary做到这一点?我需要生产者和消费者端BlockingCollection的阻止功能。

如何将 ConcurrentDictionary 包装在 BlockingCollection 中

也许你需要一个并发的阻塞字典

集合
        ConcurrentDictionary<int, BlockingCollection<string>> mailBoxes = new ConcurrentDictionary<int, BlockingCollection<string>>();
        int maxBoxes = 5;
        CancellationTokenSource cancelationTokenSource = new CancellationTokenSource();
        CancellationToken cancelationToken = cancelationTokenSource.Token;
        Random rnd = new Random();
        // Producer
        Task.Factory.StartNew(() =>
        {
            while (true)
            {
                int index = rnd.Next(0, maxBoxes);
                // put the letter in the mailbox 'index'
                var box = mailBoxes.GetOrAdd(index, new BlockingCollection<string>());
                box.Add("some message " + index, cancelationToken);
                Console.WriteLine("Produced a letter to put in box " + index);
                // Wait simulating a heavy production item.
                Thread.Sleep(1000);
            }
        });
        // Consumer 1
        Task.Factory.StartNew(() =>
        {
            while (true)
            {
                int index = rnd.Next(0, maxBoxes);
                // get the letter in the mailbox 'index'
                var box = mailBoxes.GetOrAdd(index, new BlockingCollection<string>());
                var message = box.Take(cancelationToken);
                Console.WriteLine("Consumed 1: " + message);
                // consume a item cost less than produce it:
                Thread.Sleep(50);
            }
        });
        // Consumer 2
        Task.Factory.StartNew(() =>
        {
            while (true)
            {
                int index = rnd.Next(0, maxBoxes);
                // get the letter in the mailbox 'index'
                var box = mailBoxes.GetOrAdd(index, new BlockingCollection<string>());
                var message = box.Take(cancelationToken);
                Console.WriteLine("Consumed 2: " + message);
                // consume a item cost less than produce it:
                Thread.Sleep(50);
            }
        });
        Console.ReadLine();
        cancelationTokenSource.Cancel();

通过这种方式,期望在邮箱 5 中收到某些内容的消费者将等到产品者将一封信放入邮箱 5。

您需要编写自己的适配器类 - 如下所示:

public class ConcurrentDictionaryWrapper<TKey,TValue>
    : IProducerConsumerCollection<KeyValuePair<TKey,TValue>>
{
    private ConcurrentDictionary<TKey, TValue> dictionary;
    public IEnumerator<KeyValuePair<TKey, TValue>> GetEnumerator()
    {
        return dictionary.GetEnumerator();
    }
    IEnumerator IEnumerable.GetEnumerator()
    {
        return GetEnumerator();
    }
    public void CopyTo(Array array, int index)
    {
        throw new NotImplementedException();
    }
    public int Count
    {
        get { return dictionary.Count; }
    }
    public object SyncRoot
    {
        get { return this; }
    }
    public bool IsSynchronized
    {
        get { return true; }
    }
    public void CopyTo(KeyValuePair<TKey, TValue>[] array, int index)
    {
        throw new NotImplementedException();
    }
    public bool TryAdd(KeyValuePair<TKey, TValue> item)
    {
        return dictionary.TryAdd(item.Key, item.Value);
    }
    public bool TryTake(out KeyValuePair<TKey, TValue> item)
    {
        item = dictionary.FirstOrDefault();
        TValue value;
        return dictionary.TryRemove(item.Key, out value);
    }
    public KeyValuePair<TKey, TValue>[] ToArray()
    {
        throw new NotImplementedException();
    }
}

这是一个由 ConcurrentDictionary<TKey, TValue> 支持的 IProducerConsumerCollection<T> 集合的实现。集合的T类型为 KeyValuePair<TKey, TValue> 。它与 Nick Jones 的实现非常相似,有一些改进:

public class ConcurrentDictionaryProducerConsumer<TKey, TValue>
    : IProducerConsumerCollection<KeyValuePair<TKey, TValue>>
{
    private readonly ConcurrentDictionary<TKey, TValue> _dictionary;
    private readonly ThreadLocal<IEnumerator<KeyValuePair<TKey, TValue>>> _enumerator;
    public ConcurrentDictionaryProducerConsumer(
        IEqualityComparer<TKey> comparer = default)
    {
        _dictionary = new(comparer);
        _enumerator = new(() => _dictionary.GetEnumerator());
    }
    public bool TryAdd(KeyValuePair<TKey, TValue> entry)
    {
        if (!_dictionary.TryAdd(entry.Key, entry.Value))
            throw new DuplicateKeyException();
        return true;
    }
    public bool TryTake(out KeyValuePair<TKey, TValue> entry)
    {
        // Get a cached enumerator that is used only by the current thread.
        IEnumerator<KeyValuePair<TKey, TValue>> enumerator = _enumerator.Value;
        while (true)
        {
            enumerator.Reset();
            if (!enumerator.MoveNext())
                throw new InvalidOperationException();
            entry = enumerator.Current;
            if (!_dictionary.TryRemove(entry)) continue;
            return true;
        }
    }
    public int Count => _dictionary.Count;
    public bool IsSynchronized => false;
    public object SyncRoot => throw new NotSupportedException();
    public KeyValuePair<TKey, TValue>[] ToArray() => _dictionary.ToArray();
    public IEnumerator<KeyValuePair<TKey, TValue>> GetEnumerator()
        => _dictionary.GetEnumerator();
    IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
    public void CopyTo(KeyValuePair<TKey, TValue>[] array, int index)
        => throw new NotSupportedException();
    public void CopyTo(Array array, int index) => throw new NotSupportedException();
}
public class DuplicateKeyException : InvalidOperationException { }

使用示例:

BlockingCollection<KeyValuePair<string, Item>> collection
    = new(new ConcurrentDictionaryProducerConsumer<string, Item>());
//...
try { collection.Add(KeyValuePair.Create(key, item)); }
catch (DuplicateKeyException) { Console.WriteLine($"The {key} was rejected."); }

collection.TryTake方法从ConcurrentDictionary中删除一个几乎随机的键,这不太可能是一个理想的行为。此外,性能不是很好,并且内存分配很大。由于这些原因,我不建议热情地使用上述实现。我建议看看我在这里发布的ConcurrentQueueNoDuplicates<T>,它具有适当的队列行为。

警告: 如果密钥存在,则调用collection.TryAdd(item);不会具有返回false的预期行为。任何添加重复键的尝试都必然会导致DuplicateKeyException 。有关解释,请查看上述其他帖子。

相关文章:
  • 没有找到相关文章