没有ConcurrentList< T>在.net 4.0中
本文关键字:net ConcurrentList 没有 | 更新日期: 2023-09-27 18:01:17
看到。net 4.0中新的System.Collections.Concurrent
命名空间,我非常激动,非常好!我见过ConcurrentDictionary
, ConcurrentQueue
, ConcurrentStack
, ConcurrentBag
和BlockingCollection
。
有一样东西似乎神秘地失踪了,那就是ConcurrentList<T>
。我必须自己写吗(或者从网上找来:))?
我错过了什么明显的东西吗?
我给了它一个尝试一会儿回来(也:在GitHub上)。我的实现有一些问题,在这里我就不细讲了。让我告诉你,更重要的是,我学到了什么。
首先,您不可能获得无锁且线程安全的IList<T>
的完整实现。特别是,随机插入和删除不能工作,除非你也忘记了O(1)随机访问(即,除非你"作弊",只是使用某种链表,让索引很糟糕)。
我认为可能是值得的,是一个线程安全的,IList<T>
的有限子集:特别是,允许Add
并提供随机的只读索引访问(但没有Insert
, RemoveAt
等,也没有随机的写访问)。
这是我的ConcurrentList<T>
实现的目标。但是当我在多线程场景中测试它的性能时,我发现简单地将add同步到List<T>
要快得多。基本上,添加List<T>
已经是闪电般的速度了;所涉及的计算步骤的复杂性是微不足道的(增加索引并给数组中的元素赋值;这是,真的是)。你需要一个吨的并发写才能看到任何类型的锁争用;即使这样,每次写入的平均性能仍然会超过ConcurrentList<T>
中更昂贵的无锁实现。
在相对罕见的情况下,列表的内部数组需要调整自己的大小,您确实要付出很小的代价。因此,我最终得出结论,这是一个利基场景,仅添加ConcurrentList<T>
集合类型是有意义的:当您希望保证每次调用都在上添加元素的低开销时(因此,与分摊性能目标相反)。
它根本不像你想象的那样有用。
你会使用ConcurrentList做什么?
在线程世界中,随机访问容器的概念并不像看起来那么有用。语句
if (i < MyConcurrentList.Count)
x = MyConcurrentList[i];
作为一个整体仍然不是线程安全的。
与其创建ConcurrentList,不如尝试用现有的东西来构建解决方案。最常见的类是ConcurrentBag,尤其是BlockingCollection。
尽管已经提供了很好的答案,但有时我只是想要一个线程安全的illist。没有什么高级的或花哨的。在许多情况下,性能很重要,但有时这不是一个问题。是的,如果没有像"TryGetValue"这样的方法,总是会有挑战,但大多数情况下,我只是想要一些我可以枚举的东西,而不必担心把锁放在所有东西周围。是的,有人可能会在我的实现中发现一些可能导致死锁或其他事情的"错误"(我认为),但让我们诚实地说:当涉及到多线程时,如果您没有正确编写代码,它无论如何都会陷入死锁。考虑到这一点,我决定做一个简单的ConcurrentList实现来提供这些基本需求。
至于它的价值:我做了一个基本的测试,将10,000,000个项目添加到常规列表和ConcurrentList中,结果是:
列表完成时间:7793毫秒。并发完成时间:8064毫秒。
public class ConcurrentList<T> : IList<T>, IDisposable
{
#region Fields
private readonly List<T> _list;
private readonly ReaderWriterLockSlim _lock;
#endregion
#region Constructors
public ConcurrentList()
{
this._lock = new ReaderWriterLockSlim(LockRecursionPolicy.NoRecursion);
this._list = new List<T>();
}
public ConcurrentList(int capacity)
{
this._lock = new ReaderWriterLockSlim(LockRecursionPolicy.NoRecursion);
this._list = new List<T>(capacity);
}
public ConcurrentList(IEnumerable<T> items)
{
this._lock = new ReaderWriterLockSlim(LockRecursionPolicy.NoRecursion);
this._list = new List<T>(items);
}
#endregion
#region Methods
public void Add(T item)
{
try
{
this._lock.EnterWriteLock();
this._list.Add(item);
}
finally
{
this._lock.ExitWriteLock();
}
}
public void Insert(int index, T item)
{
try
{
this._lock.EnterWriteLock();
this._list.Insert(index, item);
}
finally
{
this._lock.ExitWriteLock();
}
}
public bool Remove(T item)
{
try
{
this._lock.EnterWriteLock();
return this._list.Remove(item);
}
finally
{
this._lock.ExitWriteLock();
}
}
public void RemoveAt(int index)
{
try
{
this._lock.EnterWriteLock();
this._list.RemoveAt(index);
}
finally
{
this._lock.ExitWriteLock();
}
}
public int IndexOf(T item)
{
try
{
this._lock.EnterReadLock();
return this._list.IndexOf(item);
}
finally
{
this._lock.ExitReadLock();
}
}
public void Clear()
{
try
{
this._lock.EnterWriteLock();
this._list.Clear();
}
finally
{
this._lock.ExitWriteLock();
}
}
public bool Contains(T item)
{
try
{
this._lock.EnterReadLock();
return this._list.Contains(item);
}
finally
{
this._lock.ExitReadLock();
}
}
public void CopyTo(T[] array, int arrayIndex)
{
try
{
this._lock.EnterReadLock();
this._list.CopyTo(array, arrayIndex);
}
finally
{
this._lock.ExitReadLock();
}
}
public IEnumerator<T> GetEnumerator()
{
return new ConcurrentEnumerator<T>(this._list, this._lock);
}
IEnumerator IEnumerable.GetEnumerator()
{
return new ConcurrentEnumerator<T>(this._list, this._lock);
}
~ConcurrentList()
{
this.Dispose(false);
}
public void Dispose()
{
this.Dispose(true);
}
private void Dispose(bool disposing)
{
if (disposing)
GC.SuppressFinalize(this);
this._lock.Dispose();
}
#endregion
#region Properties
public T this[int index]
{
get
{
try
{
this._lock.EnterReadLock();
return this._list[index];
}
finally
{
this._lock.ExitReadLock();
}
}
set
{
try
{
this._lock.EnterWriteLock();
this._list[index] = value;
}
finally
{
this._lock.ExitWriteLock();
}
}
}
public int Count
{
get
{
try
{
this._lock.EnterReadLock();
return this._list.Count;
}
finally
{
this._lock.ExitReadLock();
}
}
}
public bool IsReadOnly
{
get { return false; }
}
#endregion
}
public class ConcurrentEnumerator<T> : IEnumerator<T>
{
#region Fields
private readonly IEnumerator<T> _inner;
private readonly ReaderWriterLockSlim _lock;
#endregion
#region Constructor
public ConcurrentEnumerator(IEnumerable<T> inner, ReaderWriterLockSlim @lock)
{
this._lock = @lock;
this._lock.EnterReadLock();
this._inner = inner.GetEnumerator();
}
#endregion
#region Methods
public bool MoveNext()
{
return _inner.MoveNext();
}
public void Reset()
{
_inner.Reset();
}
public void Dispose()
{
this._lock.ExitReadLock();
}
#endregion
#region Properties
public T Current
{
get { return _inner.Current; }
}
object IEnumerator.Current
{
get { return _inner.Current; }
}
#endregion
}
没有ConcurrentList的原因是它根本不能被写入。原因是IList中的几个重要操作依赖于索引,而这显然是行不通的。例如:
int catIndex = list.IndexOf("cat");
list.Insert(catIndex, "dog");
作者想要达到的效果是在"猫"之前插入"狗",但是在多线程环境中,这两行代码之间的列表可能发生任何事情。例如,另一个线程可能执行list.RemoveAt(0)
,将整个列表向左移动,但关键的是,catIndex不会改变。这里的影响是,Insert
操作实际上将"狗"放在猫之后,而不是在猫之前。
您看到的作为这个问题的"答案"的几个实现是善意的,但是正如上面所示,它们并没有提供可靠的结果。如果您真的想在多线程环境中获得类似列表的语义,那么无法通过在列表实现方法中放入锁来实现。您必须确保所使用的任何索引完全存在于锁的上下文中。结果是,您可以在具有正确锁的多线程环境中使用List,但不能使List本身存在于该环境中。
如果你认为你需要一个并发列表,实际上只有两种可能:
- 你真正需要的是ConcurrentBag
- 你需要创建你自己的集合,也许用List和你自己的并发控制来实现。
如果你有一个ConcurrentBag并且你需要将它作为一个列表传递,那么你就有问题了,因为你调用的方法已经指定他们可能会尝试做一些事情,就像我上面用cat &狗。在大多数情况下,这意味着您所调用的方法根本不是为在多线程环境中工作而构建的。这意味着你要么重构它,使它是,或者,如果你不能,你将不得不非常小心地处理它。您几乎肯定需要使用自己的锁创建自己的集合,并在锁中调用有问题的方法。
ConcurrentList
(作为一个可调整大小的数组,而不是链表)不容易用非阻塞操作编写。它的API不能很好地转换为"并发"版本。
在读的数量大大超过写的数量,或者(无论多么频繁)写的次数不是并发的的情况下,写时复制方法可能是合适的。
下面显示的实现是
- 无锁的
- 对于并发读取来说非常快,即使并发修改正在进行中-无论它们花费多长时间
- 因为"快照"是不可变的,无锁原子性是可能的,即
var snap = _list; snap[snap.Count - 1];
永远不会抛出(当然,除了一个空列表),并且您还可以免费获得具有快照语义的线程安全枚举。我多么喜欢不变性啊! - 一般实现,适用于任何数据结构和任何类型的修改
- 非常简单,即易于测试,调试,通过阅读代码验证
为了使写时复制工作,你必须保持你的数据结构有效地不可变,也就是说,在你使它们可供其他线程使用之后,不允许任何人更改它们。当你想修改时,
- 克隆结构
- 对克隆 进行修改
- 自动将引用交换到修改后的克隆
static class CopyOnWriteSwapper
{
public static void Swap<T>(ref T obj, Func<T, T> cloner, Action<T> op)
where T : class
{
while (true)
{
var objBefore = Volatile.Read(ref obj);
var newObj = cloner(objBefore);
op(newObj);
if (Interlocked.CompareExchange(ref obj, newObj, objBefore) == objBefore)
return;
}
}
}
使用CopyOnWriteSwapper.Swap(ref _myList,
orig => new List<string>(orig),
clone => clone.Add("asdf"));
如果你需要更多的性能,它将有助于消除方法的泛化,例如为你想要的每种类型的修改(Add, Remove,…)创建一个方法,并硬编码函数指针cloner
和op
。
注意:#1确保没有人修改(假定的)不可变数据结构是你的责任。在泛型实现中,我们无法防止这种情况,但是当专门化到List<T>
时,您可以使用List.AsReadOnly()
注意:#2注意列表中的值。上面的写时复制方法只保护它们的列表成员关系,但是如果你不放字符串,而是放一些其他可变对象,你必须注意线程安全(例如锁定)。但这与这个解决方案是正交的,例如,锁定可变值可以很容易地使用而不会出现问题。你只需要意识到这一点。
注意:#3如果您的数据结构非常大,并且您经常修改它,那么就内存消耗和复制所涉及的CPU成本而言,copy-all-on-write方法可能是令人望而却步的。在这种情况下,您可能需要使用MS的不可变集合。
System.Collections.Generic.List<t>
对于多个读取器已经是线程安全的了。试图使它对多个写入器线程安全是没有意义的。(出于Henk和Stephen已经提到的原因)
一些人强调了一些优点(以及我的一些想法):
- 对于无法随机存取器(索引器)来说,它可能看起来很疯狂,但对我来说它看起来很好。您只需要考虑多线程集合上有许多方法可能会失败,例如Indexer和Delete。您还可以为写访问器定义失败(回退)操作,如"失败"或简单地"在末尾添加"。 这并不是因为它是一个多线程集合,它将总是在多线程上下文中使用。或者它也可以只被一个写入器和一个读取器使用。
- 另一种能够以安全的方式使用indexer的方法是使用集合的根(如果公开的话)将动作包装到集合的锁中。 对于许多人来说,使rootLock可见违背了"良好实践"。我不能百分百确定这一点,因为如果它被隐藏,你就会剥夺用户的很多灵活性。我们必须记住,多线程编程不适合任何人。我们无法阻止每一种错误的使用。
- 微软将不得不做一些工作并定义一些新的标准来引入多线程收集的正确使用。首先,IEnumerator不应该有moveNext,但应该有GetNext返回true或false,并获得类型为T的输出参数(这样迭代就不会再阻塞了)。此外,微软已经在foreach内部使用了"using",但有时直接使用IEnumerator而不使用"using"(这在集合视图中是一个bug,可能在更多的地方也是如此)——包装IEnumerator是微软推荐的做法。这个bug消除了安全迭代器的潜力…在构造函数中锁定集合并在其Dispose方法上解锁的迭代器——用于阻塞的foreach方法。
那不是答案。这只是不适合特定位置的注释。
…我的结论是,微软必须对"foreach"做一些深刻的改变,使多线程集合更容易使用。它还必须遵循自己的IEnumerator使用规则。在此之前,我们可以很容易地编写MultiThreadList,它将使用阻塞迭代器,但不会遵循"IList"。相反,您将不得不定义自己的"ilistpersonal"接口,该接口在"插入","删除"和随机访问器(索引器)时可能无一例外地失败。
我实现了一个类似于Brian的。我的不一样:
- 我直接管理数组
- 我没有在try块中输入锁。
- 我使用
yield return
来生成枚举器。 我支持锁递归。这允许在迭代期间读取列表。我尽可能使用可升级的读锁。 -
DoSync
和GetSync
方法允许顺序交互,需要独占访问列表。
代码:
public class ConcurrentList<T> : IList<T>, IDisposable
{
private ReaderWriterLockSlim _lock = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
private int _count = 0;
public int Count
{
get
{
_lock.EnterReadLock();
try
{
return _count;
}
finally
{
_lock.ExitReadLock();
}
}
}
public int InternalArrayLength
{
get
{
_lock.EnterReadLock();
try
{
return _arr.Length;
}
finally
{
_lock.ExitReadLock();
}
}
}
private T[] _arr;
public ConcurrentList(int initialCapacity)
{
_arr = new T[initialCapacity];
}
public ConcurrentList():this(4)
{ }
public ConcurrentList(IEnumerable<T> items)
{
_arr = items.ToArray();
_count = _arr.Length;
}
public void Add(T item)
{
_lock.EnterWriteLock();
try
{
var newCount = _count + 1;
EnsureCapacity(newCount);
_arr[_count] = item;
_count = newCount;
}
finally
{
_lock.ExitWriteLock();
}
}
public void AddRange(IEnumerable<T> items)
{
if (items == null)
throw new ArgumentNullException("items");
_lock.EnterWriteLock();
try
{
var arr = items as T[] ?? items.ToArray();
var newCount = _count + arr.Length;
EnsureCapacity(newCount);
Array.Copy(arr, 0, _arr, _count, arr.Length);
_count = newCount;
}
finally
{
_lock.ExitWriteLock();
}
}
private void EnsureCapacity(int capacity)
{
if (_arr.Length >= capacity)
return;
int doubled;
checked
{
try
{
doubled = _arr.Length * 2;
}
catch (OverflowException)
{
doubled = int.MaxValue;
}
}
var newLength = Math.Max(doubled, capacity);
Array.Resize(ref _arr, newLength);
}
public bool Remove(T item)
{
_lock.EnterUpgradeableReadLock();
try
{
var i = IndexOfInternal(item);
if (i == -1)
return false;
_lock.EnterWriteLock();
try
{
RemoveAtInternal(i);
return true;
}
finally
{
_lock.ExitWriteLock();
}
}
finally
{
_lock.ExitUpgradeableReadLock();
}
}
public IEnumerator<T> GetEnumerator()
{
_lock.EnterReadLock();
try
{
for (int i = 0; i < _count; i++)
// deadlocking potential mitigated by lock recursion enforcement
yield return _arr[i];
}
finally
{
_lock.ExitReadLock();
}
}
IEnumerator IEnumerable.GetEnumerator()
{
return this.GetEnumerator();
}
public int IndexOf(T item)
{
_lock.EnterReadLock();
try
{
return IndexOfInternal(item);
}
finally
{
_lock.ExitReadLock();
}
}
private int IndexOfInternal(T item)
{
return Array.FindIndex(_arr, 0, _count, x => x.Equals(item));
}
public void Insert(int index, T item)
{
_lock.EnterUpgradeableReadLock();
try
{
if (index > _count)
throw new ArgumentOutOfRangeException("index");
_lock.EnterWriteLock();
try
{
var newCount = _count + 1;
EnsureCapacity(newCount);
// shift everything right by one, starting at index
Array.Copy(_arr, index, _arr, index + 1, _count - index);
// insert
_arr[index] = item;
_count = newCount;
}
finally
{
_lock.ExitWriteLock();
}
}
finally
{
_lock.ExitUpgradeableReadLock();
}
}
public void RemoveAt(int index)
{
_lock.EnterUpgradeableReadLock();
try
{
if (index >= _count)
throw new ArgumentOutOfRangeException("index");
_lock.EnterWriteLock();
try
{
RemoveAtInternal(index);
}
finally
{
_lock.ExitWriteLock();
}
}
finally
{
_lock.ExitUpgradeableReadLock();
}
}
private void RemoveAtInternal(int index)
{
Array.Copy(_arr, index + 1, _arr, index, _count - index-1);
_count--;
// release last element
Array.Clear(_arr, _count, 1);
}
public void Clear()
{
_lock.EnterWriteLock();
try
{
Array.Clear(_arr, 0, _count);
_count = 0;
}
finally
{
_lock.ExitWriteLock();
}
}
public bool Contains(T item)
{
_lock.EnterReadLock();
try
{
return IndexOfInternal(item) != -1;
}
finally
{
_lock.ExitReadLock();
}
}
public void CopyTo(T[] array, int arrayIndex)
{
_lock.EnterReadLock();
try
{
if(_count > array.Length - arrayIndex)
throw new ArgumentException("Destination array was not long enough.");
Array.Copy(_arr, 0, array, arrayIndex, _count);
}
finally
{
_lock.ExitReadLock();
}
}
public bool IsReadOnly
{
get { return false; }
}
public T this[int index]
{
get
{
_lock.EnterReadLock();
try
{
if (index >= _count)
throw new ArgumentOutOfRangeException("index");
return _arr[index];
}
finally
{
_lock.ExitReadLock();
}
}
set
{
_lock.EnterUpgradeableReadLock();
try
{
if (index >= _count)
throw new ArgumentOutOfRangeException("index");
_lock.EnterWriteLock();
try
{
_arr[index] = value;
}
finally
{
_lock.ExitWriteLock();
}
}
finally
{
_lock.ExitUpgradeableReadLock();
}
}
}
public void DoSync(Action<ConcurrentList<T>> action)
{
GetSync(l =>
{
action(l);
return 0;
});
}
public TResult GetSync<TResult>(Func<ConcurrentList<T>,TResult> func)
{
_lock.EnterWriteLock();
try
{
return func(this);
}
finally
{
_lock.ExitWriteLock();
}
}
public void Dispose()
{
_lock.Dispose();
}
}
在顺序执行的代码中,所使用的数据结构不同于(编写良好的)并发执行的代码。原因是顺序代码意味着隐式顺序。然而并发代码并不意味着任何顺序;更好的是,它意味着缺乏任何已定义的秩序!
因此,具有隐含顺序的数据结构(如List)对于解决并发问题不是很有用。列表意味着顺序,但它并没有清楚地定义这个顺序是什么。因此,操作列表的代码的执行顺序将(在某种程度上)决定列表的隐式顺序,这与高效的并发解决方案直接冲突。
请记住并发是一个数据问题,而不是代码问题!您不能先实现代码(或重写现有的顺序代码)并获得设计良好的并发解决方案。您需要首先设计数据结构,同时记住并发系统中不存在隐式排序。无锁的复制和写入方法在不需要处理太多项目的情况下非常有效。这是我写的一个类:
public class CopyAndWriteList<T>
{
public static List<T> Clear(List<T> list)
{
var a = new List<T>(list);
a.Clear();
return a;
}
public static List<T> Add(List<T> list, T item)
{
var a = new List<T>(list);
a.Add(item);
return a;
}
public static List<T> RemoveAt(List<T> list, int index)
{
var a = new List<T>(list);
a.RemoveAt(index);
return a;
}
public static List<T> Remove(List<T> list, T item)
{
var a = new List<T>(list);
a.Remove(item);
return a;
}
}
使用例子:
我很惊讶没有人提到使用LinkedList
作为编写专门类的基础。
通常我们不需要各种集合类的完整API,如果你编写的代码大多没有函数副作用,尽可能使用不可变类,那么你实际上不会想要改变集合,而喜欢各种快照实现。
LinkedList解决了创建大型集合的快照副本/克隆的一些难题。我也用它来创建threadsafe"枚举器枚举集合。我可以作弊,因为我知道我没有以任何方式改变集合,除了追加,我可以跟踪列表大小,并且只锁定列表大小的更改。然后,我的枚举器代码简单地枚举从0到n的任何想要"快照"的线程;只能追加的集合,它将保证表示一个"快照"。
我很确定大多数需求通常是非常简单的,你只需要2到3个方法。编写一个真正的泛型库是非常困难的,但是解决您自己的代码需求有时可以很容易地使用一两个技巧。
LinkedList
万岁,好的函数式编程。
欢呼,……爱你们!艾尔。
注。示例hack AppendOnly
类在这里:https://github.com/goblinfactory/AppendOnly