Concurrent HashSet<T> in .NET Framework?

本文关键字:NET Framework in HashSet lt Concurrent gt | 更新日期: 2023-09-27 18:32:14

我有以下类。

class Test{
    public HashSet<string> Data = new HashSet<string>();
}

我需要从不同的线程更改字段"数据",因此我想对我当前的线程安全实现提出一些意见。

class Test{
    public HashSet<string> Data = new HashSet<string>();
    public void Add(string Val){
            lock(Data) Data.Add(Val);
    }
    public void Remove(string Val){
            lock(Data) Data.Remove(Val);
    }
}

有没有更好的解决方案,直接进入字段并保护它免受多个线程的并发访问?

Concurrent HashSet<T> in .NET Framework?

您的实现是正确的。不幸的是,.NET Framework 不提供内置的并发哈希集类型。但是,有一些解决方法。

ConcurrentDictionary(推荐)

第一个是使用命名空间中的类ConcurrentDictionary<TKey, TValue> System.Collections.Concurrent 。在这种情况下,该值毫无意义,因此我们可以使用一个简单的byte(内存中的 1 个字节)。

private ConcurrentDictionary<string, byte> _data;

这是推荐的选项,因为该类型是线程安全的,并且提供与HashSet<T>相同的优势,只是键和值是不同的对象。

来源: 社会 MSDN

自我实现

最后,正如你所做的那样,你可以使用锁或 .NET 为你提供的其他方式来实现自己的数据类型,以实现线程安全。这是一个很好的例子:如何在.Net中实现ConcurrentHashSet。

此解决方案的唯一缺点是 HashSet<T> 类型不会正式并发访问,即使对于读取操作也是如此。

我引用链接帖子的代码(最初由Ben Mosher撰写)。

using System;
using System.Collections.Generic;
using System.Threading;
namespace BlahBlah.Utilities
{
    public class ConcurrentHashSet<T> : IDisposable
    {
        private readonly ReaderWriterLockSlim _lock = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
        private readonly HashSet<T> _hashSet = new HashSet<T>();
        #region Implementation of ICollection<T> ...ish
        public bool Add(T item)
        {
            _lock.EnterWriteLock();
            try
            {
                return _hashSet.Add(item);
            }
            finally
            {
                if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
            }
        }
        public void Clear()
        {
            _lock.EnterWriteLock();
            try
            {
                _hashSet.Clear();
            }
            finally
            {
                if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
            }
        }
        public bool Contains(T item)
        {
            _lock.EnterReadLock();
            try
            {
                return _hashSet.Contains(item);
            }
            finally
            {
                if (_lock.IsReadLockHeld) _lock.ExitReadLock();
            }
        }
        public bool Remove(T item)
        {
            _lock.EnterWriteLock();
            try
            {
                return _hashSet.Remove(item);
            }
            finally
            {
                if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
            }
        }
        public int Count
        {
            get
            {
                _lock.EnterReadLock();
                try
                {
                    return _hashSet.Count;
                }
                finally
                {
                    if (_lock.IsReadLockHeld) _lock.ExitReadLock();
                }
            }
        }
        #endregion
        #region Dispose
        public void Dispose()
        {
            Dispose(true);
            GC.SuppressFinalize(this);
        }
        protected virtual void Dispose(bool disposing)
        {
            if (disposing)
                if (_lock != null)
                    _lock.Dispose();
        }
        ~ConcurrentHashSet()
        {
            Dispose(false);
        }
        #endregion
    }
}

编辑:将入口锁定方法移到try块旁边,因为它们可能会引发异常并执行finally块中包含的指令。

并发包(不建议)

不建议使用 ConcurrentBag<T>,因为此类型仅允许以线程安全的方式插入给定元素和删除随机元素。此类旨在促进生产者-消费者方案,这不是 OP 的目标(此处有更多解释)。

其他操作(例如,由扩展方法提供)不支持并发使用。MSDN 文档警告:"ConcurrentBag 的所有公共和受保护成员都是线程安全的,可以从多个线程并发使用。但是,通过 ConcurrentBag 实现的接口之一(包括扩展方法)访问的成员不能保证线程安全,可能需要由调用方同步。"

我没有

包装ConcurrentDictionary或锁定HashSet,而是基于ConcurrentDictionary创建了一个实际的ConcurrentHashSet

此实现支持每个项目的基本操作,而无需HashSet 的集合操作,因为它们在并发方案中意义不大 IMO:

var concurrentHashSet = new ConcurrentHashSet<string>(
    new[]
    {
        "hamster",
        "HAMster",
        "bar",
    },
    StringComparer.OrdinalIgnoreCase);
concurrentHashSet.TryRemove("foo");
if (concurrentHashSet.Contains("BAR"))
{
    Console.WriteLine(concurrentHashSet.Count);
}

输出:2

你可以从 NuGet 获取它 这里 ,并在这里查看 GitHub 上的源代码。

由于没有其他人提到它,我将提供一种可能适合也可能不适合您的特定目的的替代方法:

Microsoft 不可变集合

来自 MS 团队背后的博客文章:

虽然创建和并发运行比以往任何时候都容易,但仍然存在一个基本问题:可变共享状态。从多个线程读取通常非常容易,但是一旦需要更新状态,就会变得更加困难,尤其是在需要锁定的设计中。

锁定的替代方法是使用不可变状态。不可变的数据结构保证永远不会改变,因此可以在不同的线程之间自由传递,而不必担心踩到别人的脚趾。

但是,这种设计带来了一个新问题:如何在每次都不复制整个状态的情况下管理状态更改?当涉及集合时,这尤其棘手。

这就是不可变集合的用武之地。

这些集合包括ImmutableHashSet和ImmutableList

性能

由于不可变集合在下面使用树数据结构来实现结构共享,因此它们的性能特征与可变集合不同。与锁定可变集合相比,结果将取决于锁定争用和访问模式。但是,取自另一篇关于不可变集合的博客文章:

问:我听说不可变集合很慢。这些有什么不同吗?当性能或内存很重要时,我可以使用它们吗?

答:这些不可变集合已经过高度调整,在平衡内存共享的同时具有与可变集合竞争的性能特征。在某些情况下,它们在算法和实际时间上都几乎与可变集合一样快,有时甚至更快,而在其他情况下,它们在算法上更复杂。然而,在许多情况下,差异可以忽略不计。通常,您应该使用最简单的代码来完成工作,然后根据需要调整性能。不可变集合可帮助您编写简单的代码,尤其是在必须考虑线程安全性时。

换句话说,在许多情况下,差异不会很明显,您应该选择更简单的选择 - 对于并发集,将使用ImmutableHashSet<T>,因为您没有现有的锁定可变实现! :-)

使ISet<T>并发的棘手之处在于集合方法(并集、交集、差分)本质上是迭代的。 至少您必须遍历操作中涉及的一个集合的所有 n 个成员,同时锁定两个集合。

当您必须在迭代期间锁定整个集时,您将失去ConcurrentDictionary<T,byte>的优势。 如果不进行锁定,这些操作就不是线程安全的。

考虑到ConcurrentDictionary<T,byte>增加的开销,使用重量更轻的HashSet<T>并将所有东西都包围在锁中可能更明智。

如果不需要设置操作,请使用 ConcurrentDictionary<T,byte>,并在添加键时仅使用 default(byte) 作为值。

基于ConcurrentDictionary<TKey, TValue>的解决方案通常具有良好的可扩展性;但是,如果您需要访问CountKeysValues属性,或者您遍历这些项目,则它比单个锁定集合更糟糕。这是因为ConcurrentDictionary使用一组锁(默认情况下,它们的数量取决于 CPU 内核的数量),并且访问这些成员会导致获取所有锁,因此 CPU 拥有的内核越多,它们的性能就越差。

另一个答案建议使用不可变集合。尽管它们是线程安全的,但仅当您很少向它们添加新项目时,它们才会表现良好(这总是创建一个新实例,尽管尝试从以前的实例继承尽可能多的节点),但即使在这种情况下,它们通常性能也较差。

我最终得到了另一种解决方案(我后来也将其应用于我的ThreadSafeHashSet<T>):与ConcurrentDictionary相反,我只使用单个锁,但只是暂时的:不时地将新物品移动到完全无锁的存储中,即使删除和重新添加相同的密钥也会变得无锁,因此非常快。没有计时器用于执行这些合并。仅当必须访问锁定存储并且它是在"足够长"的时间之前创建的(可配置的)时,才会合并到无锁存储。

注意:请参阅ThreadSafeDictionary<TKey TValue>类的"备注"部分的性能比较表(它具有与ThreadSafeHashSet<T>相同的特征),以查看它是否适合您的需求。如果您想自己执行性能测试,您可以在此处找到它们的来源。

此处提供了源代码,也可以将其下载为 NuGet 包。

我更喜欢完整的解决方案,所以我这样做了: 请注意,我的 Count 是以不同的方式实现的,因为我不明白为什么应该禁止在尝试计算其值时读取哈希集。

@Zen,感谢您开始。

[DebuggerDisplay("Count = {Count}")]
[Serializable]
public class ConcurrentHashSet<T> : ICollection<T>, ISet<T>, ISerializable, IDeserializationCallback
{
    private readonly ReaderWriterLockSlim _lock = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
    private readonly HashSet<T> _hashSet = new HashSet<T>();
    public ConcurrentHashSet()
    {
    }
    public ConcurrentHashSet(IEqualityComparer<T> comparer)
    {
        _hashSet = new HashSet<T>(comparer);
    }
    public ConcurrentHashSet(IEnumerable<T> collection)
    {
        _hashSet = new HashSet<T>(collection);
    }
    public ConcurrentHashSet(IEnumerable<T> collection, IEqualityComparer<T> comparer)
    {
        _hashSet = new HashSet<T>(collection, comparer);
    }
    protected ConcurrentHashSet(SerializationInfo info, StreamingContext context)
    {
        _hashSet = new HashSet<T>();
        // not sure about this one really...
        var iSerializable = _hashSet as ISerializable;
        iSerializable.GetObjectData(info, context);
    }
    #region Dispose
    public void Dispose()
    {
        Dispose(true);
        GC.SuppressFinalize(this);
    }
    protected virtual void Dispose(bool disposing)
    {
        if (disposing)
            if (_lock != null)
                _lock.Dispose();
    }
    public IEnumerator<T> GetEnumerator()
    {
        return _hashSet.GetEnumerator();
    }
    ~ConcurrentHashSet()
    {
        Dispose(false);
    }
    public void OnDeserialization(object sender)
    {
        _hashSet.OnDeserialization(sender);
    }
    public void GetObjectData(SerializationInfo info, StreamingContext context)
    {
        _hashSet.GetObjectData(info, context);
    }
    IEnumerator IEnumerable.GetEnumerator()
    {
        return GetEnumerator();
    }
    #endregion
    public void Add(T item)
    {
        _lock.EnterWriteLock();
        try
        {
            _hashSet.Add(item);
        }
        finally
        {
            if(_lock.IsWriteLockHeld) _lock.ExitWriteLock();
        }
    }
    public void UnionWith(IEnumerable<T> other)
    {
        _lock.EnterWriteLock();
        _lock.EnterReadLock();
        try
        {
            _hashSet.UnionWith(other);
        }
        finally
        {
            if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
            if (_lock.IsReadLockHeld) _lock.ExitReadLock();
        }
    }
    public void IntersectWith(IEnumerable<T> other)
    {
        _lock.EnterWriteLock();
        _lock.EnterReadLock();
        try
        {
            _hashSet.IntersectWith(other);
        }
        finally
        {
            if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
            if (_lock.IsReadLockHeld) _lock.ExitReadLock();
        }
    }
    public void ExceptWith(IEnumerable<T> other)
    {
        _lock.EnterWriteLock();
        _lock.EnterReadLock();
        try
        {
            _hashSet.ExceptWith(other);
        }
        finally
        {
            if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
            if (_lock.IsReadLockHeld) _lock.ExitReadLock();
        }
    }
    public void SymmetricExceptWith(IEnumerable<T> other)
    {
        _lock.EnterWriteLock();
        try
        {
            _hashSet.SymmetricExceptWith(other);
        }
        finally
        {
            if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
        }
    }
    public bool IsSubsetOf(IEnumerable<T> other)
    {
        _lock.EnterWriteLock();
        try
        {
            return _hashSet.IsSubsetOf(other);
        }
        finally
        {
            if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
        }
    }
    public bool IsSupersetOf(IEnumerable<T> other)
    {
        _lock.EnterWriteLock();
        try
        {
            return _hashSet.IsSupersetOf(other);
        }
        finally
        {
            if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
        }
    }
    public bool IsProperSupersetOf(IEnumerable<T> other)
    {
        _lock.EnterWriteLock();
        try
        {
            return _hashSet.IsProperSupersetOf(other);
        }
        finally
        {
            if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
        }
    }
    public bool IsProperSubsetOf(IEnumerable<T> other)
    {
        _lock.EnterWriteLock();
        try
        {
            return _hashSet.IsProperSubsetOf(other);
        }
        finally
        {
            if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
        }
    }
    public bool Overlaps(IEnumerable<T> other)
    {
        _lock.EnterWriteLock();
        try
        {
            return _hashSet.Overlaps(other);
        }
        finally
        {
            if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
        }
    }
    public bool SetEquals(IEnumerable<T> other)
    {
        _lock.EnterWriteLock();
        try
        {
            return _hashSet.SetEquals(other);
        }
        finally
        {
            if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
        }
    }
    bool ISet<T>.Add(T item)
    {
        _lock.EnterWriteLock();
        try
        {
            return _hashSet.Add(item);
        }
        finally
        {
            if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
        }
    }
    public void Clear()
    {
        _lock.EnterWriteLock();
        try
        {
            _hashSet.Clear();
        }
        finally
        {
            if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
        }
    }
    public bool Contains(T item)
    {
        _lock.EnterWriteLock();
        try
        {
            return _hashSet.Contains(item);
        }
        finally
        {
            if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
        }
    }
    public void CopyTo(T[] array, int arrayIndex)
    {
        _lock.EnterWriteLock();
        try
        {
            _hashSet.CopyTo(array, arrayIndex);
        }
        finally
        {
            if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
        }
    }
    public bool Remove(T item)
    {
        _lock.EnterWriteLock();
        try
        {
            return _hashSet.Remove(item);
        }
        finally
        {
            if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
        }
    }
    public int Count
    {
        get
        {
            _lock.EnterWriteLock();
            try
            {
                return _hashSet.Count;
            }
            finally
            {
                if(_lock.IsWriteLockHeld) _lock.ExitWriteLock();
            }
        }
    }
    public bool IsReadOnly
    {
        get { return false; }
    }
}

我发现,在需要良好性能的"高吞吐量"场景中,无论是简单地锁定 Add 和 remove System.Collections.Generic.HashSet 的方法,还是包装框架的 ConcurrentDictionary 都是不够的。

使用这个简单的想法已经可以实现相当好的性能:

public class ExampleHashSet<T>
{
    const int ConcurrencyLevel = 124;
    const int Lower31BitMask = 0x7FFFFFFF;
    
    HashSet<T>[] sets = new HashSet<T>[ConcurrencyLevel];
    IEqualityComparer<T> comparer;
    public ExampleHashSet()
    {
        comparer = EqualityComparer<T>.Default;
        for(int i = 0; i < ConcurrencyLevel; i++)
            sets[i] = new HashSet<T>();
    }
    public bool Add(T item)
    {
        int hash = (comparer.GetHashCode(item) & Lower31BitMask) % ConcurrencyLevel;
        
        lock(sets[hash]) 
        {
            return sets[hash].Add(item);
        }
    }
    public bool Remove(T item)
    {
        int hash = (comparer.GetHashCode(item) & Lower31BitMask) % ConcurrencyLevel;
        
        lock(sets[hash]) 
        {
            return sets[hash].Remove(item);
        }
    }
    // further methods ...
}

系统的 HashSet 是包装的,但与其他 anwers 相比,我们在多个哈希集。不同的线程能够在不同的哈希集上"工作",从而降低整体等待时间。

这个想法可以概括并直接在 HashSet 本身中实现(在存储桶上保持锁定,而不是锁定成套)。可以在此处找到一个示例。