Concurrent HashSet<T> in .NET Framework?
本文关键字:NET Framework in HashSet lt Concurrent gt | 更新日期: 2023-09-27 18:32:14
我有以下类。
class Test{
public HashSet<string> Data = new HashSet<string>();
}
我需要从不同的线程更改字段"数据",因此我想对我当前的线程安全实现提出一些意见。
class Test{
public HashSet<string> Data = new HashSet<string>();
public void Add(string Val){
lock(Data) Data.Add(Val);
}
public void Remove(string Val){
lock(Data) Data.Remove(Val);
}
}
有没有更好的解决方案,直接进入字段并保护它免受多个线程的并发访问?
您的实现是正确的。不幸的是,.NET Framework 不提供内置的并发哈希集类型。但是,有一些解决方法。
ConcurrentDictionary(推荐)
第一个是使用命名空间中的类ConcurrentDictionary<TKey, TValue>
System.Collections.Concurrent
。在这种情况下,该值毫无意义,因此我们可以使用一个简单的byte
(内存中的 1 个字节)。
private ConcurrentDictionary<string, byte> _data;
这是推荐的选项,因为该类型是线程安全的,并且提供与HashSet<T>
相同的优势,只是键和值是不同的对象。
来源: 社会 MSDN
自我实现
最后,正如你所做的那样,你可以使用锁或 .NET 为你提供的其他方式来实现自己的数据类型,以实现线程安全。这是一个很好的例子:如何在.Net中实现ConcurrentHashSet。
此解决方案的唯一缺点是 HashSet<T>
类型不会正式并发访问,即使对于读取操作也是如此。
我引用链接帖子的代码(最初由Ben Mosher撰写)。
using System;
using System.Collections.Generic;
using System.Threading;
namespace BlahBlah.Utilities
{
public class ConcurrentHashSet<T> : IDisposable
{
private readonly ReaderWriterLockSlim _lock = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
private readonly HashSet<T> _hashSet = new HashSet<T>();
#region Implementation of ICollection<T> ...ish
public bool Add(T item)
{
_lock.EnterWriteLock();
try
{
return _hashSet.Add(item);
}
finally
{
if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
}
}
public void Clear()
{
_lock.EnterWriteLock();
try
{
_hashSet.Clear();
}
finally
{
if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
}
}
public bool Contains(T item)
{
_lock.EnterReadLock();
try
{
return _hashSet.Contains(item);
}
finally
{
if (_lock.IsReadLockHeld) _lock.ExitReadLock();
}
}
public bool Remove(T item)
{
_lock.EnterWriteLock();
try
{
return _hashSet.Remove(item);
}
finally
{
if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
}
}
public int Count
{
get
{
_lock.EnterReadLock();
try
{
return _hashSet.Count;
}
finally
{
if (_lock.IsReadLockHeld) _lock.ExitReadLock();
}
}
}
#endregion
#region Dispose
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
if (disposing)
if (_lock != null)
_lock.Dispose();
}
~ConcurrentHashSet()
{
Dispose(false);
}
#endregion
}
}
编辑:将入口锁定方法移到try
块旁边,因为它们可能会引发异常并执行finally
块中包含的指令。
并发包(不建议)
不建议使用 ConcurrentBag<T>
,因为此类型仅允许以线程安全的方式插入给定元素和删除随机元素。此类旨在促进生产者-消费者方案,这不是 OP 的目标(此处有更多解释)。
其他操作(例如,由扩展方法提供)不支持并发使用。MSDN 文档警告:"ConcurrentBag 的所有公共和受保护成员都是线程安全的,可以从多个线程并发使用。但是,通过 ConcurrentBag 实现的接口之一(包括扩展方法)访问的成员不能保证线程安全,可能需要由调用方同步。"
包装ConcurrentDictionary
或锁定HashSet
,而是基于ConcurrentDictionary
创建了一个实际的ConcurrentHashSet
。
此实现支持每个项目的基本操作,而无需HashSet
的集合操作,因为它们在并发方案中意义不大 IMO:
var concurrentHashSet = new ConcurrentHashSet<string>(
new[]
{
"hamster",
"HAMster",
"bar",
},
StringComparer.OrdinalIgnoreCase);
concurrentHashSet.TryRemove("foo");
if (concurrentHashSet.Contains("BAR"))
{
Console.WriteLine(concurrentHashSet.Count);
}
输出:2
你可以从 NuGet 获取它 这里 ,并在这里查看 GitHub 上的源代码。
由于没有其他人提到它,我将提供一种可能适合也可能不适合您的特定目的的替代方法:
Microsoft 不可变集合
来自 MS 团队背后的博客文章:
虽然创建和并发运行比以往任何时候都容易,但仍然存在一个基本问题:可变共享状态。从多个线程读取通常非常容易,但是一旦需要更新状态,就会变得更加困难,尤其是在需要锁定的设计中。
锁定的替代方法是使用不可变状态。不可变的数据结构保证永远不会改变,因此可以在不同的线程之间自由传递,而不必担心踩到别人的脚趾。
但是,这种设计带来了一个新问题:如何在每次都不复制整个状态的情况下管理状态更改?当涉及集合时,这尤其棘手。
这就是不可变集合的用武之地。
这些集合包括ImmutableHashSet
性能
由于不可变集合在下面使用树数据结构来实现结构共享,因此它们的性能特征与可变集合不同。与锁定可变集合相比,结果将取决于锁定争用和访问模式。但是,取自另一篇关于不可变集合的博客文章:
问:我听说不可变集合很慢。这些有什么不同吗?当性能或内存很重要时,我可以使用它们吗?
答:这些不可变集合已经过高度调整,在平衡内存共享的同时具有与可变集合竞争的性能特征。在某些情况下,它们在算法和实际时间上都几乎与可变集合一样快,有时甚至更快,而在其他情况下,它们在算法上更复杂。然而,在许多情况下,差异可以忽略不计。通常,您应该使用最简单的代码来完成工作,然后根据需要调整性能。不可变集合可帮助您编写简单的代码,尤其是在必须考虑线程安全性时。
换句话说,在许多情况下,差异不会很明显,您应该选择更简单的选择 - 对于并发集,将使用ImmutableHashSet<T>
,因为您没有现有的锁定可变实现! :-)
使ISet<T>
并发的棘手之处在于集合方法(并集、交集、差分)本质上是迭代的。 至少您必须遍历操作中涉及的一个集合的所有 n 个成员,同时锁定两个集合。
当您必须在迭代期间锁定整个集时,您将失去ConcurrentDictionary<T,byte>
的优势。 如果不进行锁定,这些操作就不是线程安全的。
考虑到ConcurrentDictionary<T,byte>
增加的开销,使用重量更轻的HashSet<T>
并将所有东西都包围在锁中可能更明智。
如果不需要设置操作,请使用 ConcurrentDictionary<T,byte>
,并在添加键时仅使用 default(byte)
作为值。
基于ConcurrentDictionary<TKey, TValue>
的解决方案通常具有良好的可扩展性;但是,如果您需要访问Count
、Keys
或Values
属性,或者您遍历这些项目,则它比单个锁定集合更糟糕。这是因为ConcurrentDictionary
使用一组锁(默认情况下,它们的数量取决于 CPU 内核的数量),并且访问这些成员会导致获取所有锁,因此 CPU 拥有的内核越多,它们的性能就越差。
另一个答案建议使用不可变集合。尽管它们是线程安全的,但仅当您很少向它们添加新项目时,它们才会表现良好(这总是创建一个新实例,尽管尝试从以前的实例继承尽可能多的节点),但即使在这种情况下,它们通常性能也较差。
我最终得到了另一种解决方案(我后来也将其应用于我的ThreadSafeHashSet<T>
):与ConcurrentDictionary
相反,我只使用单个锁,但只是暂时的:不时地将新物品移动到完全无锁的存储中,即使删除和重新添加相同的密钥也会变得无锁,因此非常快。没有计时器用于执行这些合并。仅当必须访问锁定存储并且它是在"足够长"的时间之前创建的(可配置的)时,才会合并到无锁存储。
注意:请参阅
ThreadSafeDictionary<TKey TValue>
类的"备注"部分的性能比较表(它具有与ThreadSafeHashSet<T>
相同的特征),以查看它是否适合您的需求。如果您想自己执行性能测试,您可以在此处找到它们的来源。
此处提供了源代码,也可以将其下载为 NuGet 包。
我更喜欢完整的解决方案,所以我这样做了: 请注意,我的 Count 是以不同的方式实现的,因为我不明白为什么应该禁止在尝试计算其值时读取哈希集。
@Zen,感谢您开始。
[DebuggerDisplay("Count = {Count}")]
[Serializable]
public class ConcurrentHashSet<T> : ICollection<T>, ISet<T>, ISerializable, IDeserializationCallback
{
private readonly ReaderWriterLockSlim _lock = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
private readonly HashSet<T> _hashSet = new HashSet<T>();
public ConcurrentHashSet()
{
}
public ConcurrentHashSet(IEqualityComparer<T> comparer)
{
_hashSet = new HashSet<T>(comparer);
}
public ConcurrentHashSet(IEnumerable<T> collection)
{
_hashSet = new HashSet<T>(collection);
}
public ConcurrentHashSet(IEnumerable<T> collection, IEqualityComparer<T> comparer)
{
_hashSet = new HashSet<T>(collection, comparer);
}
protected ConcurrentHashSet(SerializationInfo info, StreamingContext context)
{
_hashSet = new HashSet<T>();
// not sure about this one really...
var iSerializable = _hashSet as ISerializable;
iSerializable.GetObjectData(info, context);
}
#region Dispose
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
if (disposing)
if (_lock != null)
_lock.Dispose();
}
public IEnumerator<T> GetEnumerator()
{
return _hashSet.GetEnumerator();
}
~ConcurrentHashSet()
{
Dispose(false);
}
public void OnDeserialization(object sender)
{
_hashSet.OnDeserialization(sender);
}
public void GetObjectData(SerializationInfo info, StreamingContext context)
{
_hashSet.GetObjectData(info, context);
}
IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}
#endregion
public void Add(T item)
{
_lock.EnterWriteLock();
try
{
_hashSet.Add(item);
}
finally
{
if(_lock.IsWriteLockHeld) _lock.ExitWriteLock();
}
}
public void UnionWith(IEnumerable<T> other)
{
_lock.EnterWriteLock();
_lock.EnterReadLock();
try
{
_hashSet.UnionWith(other);
}
finally
{
if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
if (_lock.IsReadLockHeld) _lock.ExitReadLock();
}
}
public void IntersectWith(IEnumerable<T> other)
{
_lock.EnterWriteLock();
_lock.EnterReadLock();
try
{
_hashSet.IntersectWith(other);
}
finally
{
if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
if (_lock.IsReadLockHeld) _lock.ExitReadLock();
}
}
public void ExceptWith(IEnumerable<T> other)
{
_lock.EnterWriteLock();
_lock.EnterReadLock();
try
{
_hashSet.ExceptWith(other);
}
finally
{
if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
if (_lock.IsReadLockHeld) _lock.ExitReadLock();
}
}
public void SymmetricExceptWith(IEnumerable<T> other)
{
_lock.EnterWriteLock();
try
{
_hashSet.SymmetricExceptWith(other);
}
finally
{
if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
}
}
public bool IsSubsetOf(IEnumerable<T> other)
{
_lock.EnterWriteLock();
try
{
return _hashSet.IsSubsetOf(other);
}
finally
{
if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
}
}
public bool IsSupersetOf(IEnumerable<T> other)
{
_lock.EnterWriteLock();
try
{
return _hashSet.IsSupersetOf(other);
}
finally
{
if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
}
}
public bool IsProperSupersetOf(IEnumerable<T> other)
{
_lock.EnterWriteLock();
try
{
return _hashSet.IsProperSupersetOf(other);
}
finally
{
if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
}
}
public bool IsProperSubsetOf(IEnumerable<T> other)
{
_lock.EnterWriteLock();
try
{
return _hashSet.IsProperSubsetOf(other);
}
finally
{
if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
}
}
public bool Overlaps(IEnumerable<T> other)
{
_lock.EnterWriteLock();
try
{
return _hashSet.Overlaps(other);
}
finally
{
if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
}
}
public bool SetEquals(IEnumerable<T> other)
{
_lock.EnterWriteLock();
try
{
return _hashSet.SetEquals(other);
}
finally
{
if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
}
}
bool ISet<T>.Add(T item)
{
_lock.EnterWriteLock();
try
{
return _hashSet.Add(item);
}
finally
{
if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
}
}
public void Clear()
{
_lock.EnterWriteLock();
try
{
_hashSet.Clear();
}
finally
{
if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
}
}
public bool Contains(T item)
{
_lock.EnterWriteLock();
try
{
return _hashSet.Contains(item);
}
finally
{
if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
}
}
public void CopyTo(T[] array, int arrayIndex)
{
_lock.EnterWriteLock();
try
{
_hashSet.CopyTo(array, arrayIndex);
}
finally
{
if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
}
}
public bool Remove(T item)
{
_lock.EnterWriteLock();
try
{
return _hashSet.Remove(item);
}
finally
{
if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
}
}
public int Count
{
get
{
_lock.EnterWriteLock();
try
{
return _hashSet.Count;
}
finally
{
if(_lock.IsWriteLockHeld) _lock.ExitWriteLock();
}
}
}
public bool IsReadOnly
{
get { return false; }
}
}
我发现,在需要良好性能的"高吞吐量"场景中,无论是简单地锁定 Add 和 remove System.Collections.Generic.HashSet 的方法,还是包装框架的 ConcurrentDictionary 都是不够的。
使用这个简单的想法已经可以实现相当好的性能:
public class ExampleHashSet<T>
{
const int ConcurrencyLevel = 124;
const int Lower31BitMask = 0x7FFFFFFF;
HashSet<T>[] sets = new HashSet<T>[ConcurrencyLevel];
IEqualityComparer<T> comparer;
public ExampleHashSet()
{
comparer = EqualityComparer<T>.Default;
for(int i = 0; i < ConcurrencyLevel; i++)
sets[i] = new HashSet<T>();
}
public bool Add(T item)
{
int hash = (comparer.GetHashCode(item) & Lower31BitMask) % ConcurrencyLevel;
lock(sets[hash])
{
return sets[hash].Add(item);
}
}
public bool Remove(T item)
{
int hash = (comparer.GetHashCode(item) & Lower31BitMask) % ConcurrencyLevel;
lock(sets[hash])
{
return sets[hash].Remove(item);
}
}
// further methods ...
}
系统的 HashSet 是包装的,但与其他 anwers 相比,我们在多个哈希集。不同的线程能够在不同的哈希集上"工作",从而降低整体等待时间。
这个想法可以概括并直接在 HashSet 本身中实现(在存储桶上保持锁定,而不是锁定成套)。可以在此处找到一个示例。