如何将 ConcurrentDictionary 包装在 BlockingCollection 中
本文关键字:BlockingCollection 包装 ConcurrentDictionary | 更新日期: 2023-09-27 18:35:16
我尝试通过将ConcurrentDictionary
包装在BlockingCollection
中来实现它,但似乎没有成功。
我知道一个变量声明适用于BlockingCollection
,例如ConcurrentBag<T>
,ConcurrentQueue<T>
等。
因此,要创建一个包裹在BlockingCollection
中的ConcurrentBag
,我将像这样声明和实例化:
BlockingCollection<int> bag = new BlockingCollection<int>(new ConcurrentBag<int>());
但是如何为ConcurrentDictionary
做到这一点?我需要生产者和消费者端BlockingCollection
的阻止功能。
也许你需要一个并发的阻塞字典
集合 ConcurrentDictionary<int, BlockingCollection<string>> mailBoxes = new ConcurrentDictionary<int, BlockingCollection<string>>();
int maxBoxes = 5;
CancellationTokenSource cancelationTokenSource = new CancellationTokenSource();
CancellationToken cancelationToken = cancelationTokenSource.Token;
Random rnd = new Random();
// Producer
Task.Factory.StartNew(() =>
{
while (true)
{
int index = rnd.Next(0, maxBoxes);
// put the letter in the mailbox 'index'
var box = mailBoxes.GetOrAdd(index, new BlockingCollection<string>());
box.Add("some message " + index, cancelationToken);
Console.WriteLine("Produced a letter to put in box " + index);
// Wait simulating a heavy production item.
Thread.Sleep(1000);
}
});
// Consumer 1
Task.Factory.StartNew(() =>
{
while (true)
{
int index = rnd.Next(0, maxBoxes);
// get the letter in the mailbox 'index'
var box = mailBoxes.GetOrAdd(index, new BlockingCollection<string>());
var message = box.Take(cancelationToken);
Console.WriteLine("Consumed 1: " + message);
// consume a item cost less than produce it:
Thread.Sleep(50);
}
});
// Consumer 2
Task.Factory.StartNew(() =>
{
while (true)
{
int index = rnd.Next(0, maxBoxes);
// get the letter in the mailbox 'index'
var box = mailBoxes.GetOrAdd(index, new BlockingCollection<string>());
var message = box.Take(cancelationToken);
Console.WriteLine("Consumed 2: " + message);
// consume a item cost less than produce it:
Thread.Sleep(50);
}
});
Console.ReadLine();
cancelationTokenSource.Cancel();
通过这种方式,期望在邮箱 5 中收到某些内容的消费者将等到产品者将一封信放入邮箱 5。
您需要编写自己的适配器类 - 如下所示:
public class ConcurrentDictionaryWrapper<TKey,TValue>
: IProducerConsumerCollection<KeyValuePair<TKey,TValue>>
{
private ConcurrentDictionary<TKey, TValue> dictionary;
public IEnumerator<KeyValuePair<TKey, TValue>> GetEnumerator()
{
return dictionary.GetEnumerator();
}
IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}
public void CopyTo(Array array, int index)
{
throw new NotImplementedException();
}
public int Count
{
get { return dictionary.Count; }
}
public object SyncRoot
{
get { return this; }
}
public bool IsSynchronized
{
get { return true; }
}
public void CopyTo(KeyValuePair<TKey, TValue>[] array, int index)
{
throw new NotImplementedException();
}
public bool TryAdd(KeyValuePair<TKey, TValue> item)
{
return dictionary.TryAdd(item.Key, item.Value);
}
public bool TryTake(out KeyValuePair<TKey, TValue> item)
{
item = dictionary.FirstOrDefault();
TValue value;
return dictionary.TryRemove(item.Key, out value);
}
public KeyValuePair<TKey, TValue>[] ToArray()
{
throw new NotImplementedException();
}
}
这是一个由 ConcurrentDictionary<TKey, TValue>
支持的 IProducerConsumerCollection<T>
集合的实现。集合的T
类型为 KeyValuePair<TKey, TValue>
。它与 Nick Jones 的实现非常相似,有一些改进:
public class ConcurrentDictionaryProducerConsumer<TKey, TValue>
: IProducerConsumerCollection<KeyValuePair<TKey, TValue>>
{
private readonly ConcurrentDictionary<TKey, TValue> _dictionary;
private readonly ThreadLocal<IEnumerator<KeyValuePair<TKey, TValue>>> _enumerator;
public ConcurrentDictionaryProducerConsumer(
IEqualityComparer<TKey> comparer = default)
{
_dictionary = new(comparer);
_enumerator = new(() => _dictionary.GetEnumerator());
}
public bool TryAdd(KeyValuePair<TKey, TValue> entry)
{
if (!_dictionary.TryAdd(entry.Key, entry.Value))
throw new DuplicateKeyException();
return true;
}
public bool TryTake(out KeyValuePair<TKey, TValue> entry)
{
// Get a cached enumerator that is used only by the current thread.
IEnumerator<KeyValuePair<TKey, TValue>> enumerator = _enumerator.Value;
while (true)
{
enumerator.Reset();
if (!enumerator.MoveNext())
throw new InvalidOperationException();
entry = enumerator.Current;
if (!_dictionary.TryRemove(entry)) continue;
return true;
}
}
public int Count => _dictionary.Count;
public bool IsSynchronized => false;
public object SyncRoot => throw new NotSupportedException();
public KeyValuePair<TKey, TValue>[] ToArray() => _dictionary.ToArray();
public IEnumerator<KeyValuePair<TKey, TValue>> GetEnumerator()
=> _dictionary.GetEnumerator();
IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
public void CopyTo(KeyValuePair<TKey, TValue>[] array, int index)
=> throw new NotSupportedException();
public void CopyTo(Array array, int index) => throw new NotSupportedException();
}
public class DuplicateKeyException : InvalidOperationException { }
使用示例:
BlockingCollection<KeyValuePair<string, Item>> collection
= new(new ConcurrentDictionaryProducerConsumer<string, Item>());
//...
try { collection.Add(KeyValuePair.Create(key, item)); }
catch (DuplicateKeyException) { Console.WriteLine($"The {key} was rejected."); }
collection.TryTake
方法从ConcurrentDictionary
中删除一个几乎随机的键,这不太可能是一个理想的行为。此外,性能不是很好,并且内存分配很大。由于这些原因,我不建议热情地使用上述实现。我建议看看我在这里发布的ConcurrentQueueNoDuplicates<T>
,它具有适当的队列行为。
警告: 如果密钥存在,则调用collection.TryAdd(item);
不会具有返回false
的预期行为。任何添加重复键的尝试都必然会导致DuplicateKeyException
。有关解释,请查看上述其他帖子。