ConcurrentList -线程安全未给出
本文关键字:安全 线程 ConcurrentList | 更新日期: 2023-09-27 17:53:45
我看了一下ConcurrentList的实现,发现了这个:http://deanchalk.com/2010/10/c-fast-parallel-concurrentlistt-implementation/
代码在实现中工作良好,您可以在给定的网页上找到,但是当我更改DoWork
方法时:
static void DoWork(ICollection<int> list, int count)
{
for (var i = 0; i < count; i++)
{
list.Add(i);
// use spinwait to emulate work but avoiding
// context switching
Thread.SpinWait(100000);
}
}
更改:
static void DoWork(ICollection<int> list, int count)
{
for (var i = 0; i < count; i++)
{
var foo = list.Any(x => x == 2); //<-- throws exception.. (Enumeration changed)
if (foo == true)
{
//do something...
}
list.Add(i);
}
}
这个更改最终会中断执行,告诉我枚举已被修改。
我如何使这段代码工作?
---- Concurrent List ----
public class ConcurrentList<T> : IList<T>, IList
{
private readonly List<T> underlyingList = new List<T>();
private readonly object syncRoot = new object();
private readonly ConcurrentQueue<T> underlyingQueue;
private bool requiresSync;
private bool isDirty;
public ConcurrentList()
{
underlyingQueue = new ConcurrentQueue<T>();
}
public ConcurrentList(IEnumerable<T> items)
{
underlyingQueue = new ConcurrentQueue<T>(items);
}
private void UpdateLists()
{
if (!isDirty)
return;
lock (syncRoot)
{
requiresSync = true;
T temp;
while (underlyingQueue.TryDequeue(out temp))
underlyingList.Add(temp);
requiresSync = false;
}
}
public IEnumerator<T> GetEnumerator()
{
lock (syncRoot)
{
UpdateLists();
return underlyingList.GetEnumerator();
}
}
IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}
public void Add(T item)
{
if (requiresSync)
lock (syncRoot)
underlyingQueue.Enqueue(item);
else
underlyingQueue.Enqueue(item);
isDirty = true;
}
public int Add(object value)
{
if (requiresSync)
lock (syncRoot)
underlyingQueue.Enqueue((T)value);
else
underlyingQueue.Enqueue((T)value);
isDirty = true;
lock (syncRoot)
{
UpdateLists();
return underlyingList.IndexOf((T)value);
}
}
public bool Contains(object value)
{
lock (syncRoot)
{
UpdateLists();
return underlyingList.Contains((T)value);
}
}
public int IndexOf(object value)
{
lock (syncRoot)
{
UpdateLists();
return underlyingList.IndexOf((T)value);
}
}
public void Insert(int index, object value)
{
lock (syncRoot)
{
UpdateLists();
underlyingList.Insert(index, (T)value);
}
}
public void Remove(object value)
{
lock (syncRoot)
{
UpdateLists();
underlyingList.Remove((T)value);
}
}
public void RemoveAt(int index)
{
lock (syncRoot)
{
UpdateLists();
underlyingList.RemoveAt(index);
}
}
T IList<T>.this[int index]
{
get
{
lock (syncRoot)
{
UpdateLists();
return underlyingList[index];
}
}
set
{
lock (syncRoot)
{
UpdateLists();
underlyingList[index] = value;
}
}
}
object IList.this[int index]
{
get { return ((IList)this)[index]; }
set { ((IList)this)[index] = (T)value; }
}
public bool IsReadOnly
{
get { return false; }
}
public bool IsFixedSize
{
get { return false; }
}
public void Clear()
{
lock (syncRoot)
{
UpdateLists();
underlyingList.Clear();
}
}
public bool Contains(T item)
{
lock (syncRoot)
{
UpdateLists();
return underlyingList.Contains(item);
}
}
public void CopyTo(T[] array, int arrayIndex)
{
lock (syncRoot)
{
UpdateLists();
underlyingList.CopyTo(array, arrayIndex);
}
}
public bool Remove(T item)
{
lock (syncRoot)
{
UpdateLists();
return underlyingList.Remove(item);
}
}
public void CopyTo(Array array, int index)
{
lock (syncRoot)
{
UpdateLists();
underlyingList.CopyTo((T[])array, index);
}
}
public int Count
{
get
{
lock (syncRoot)
{
UpdateLists();
return underlyingList.Count;
}
}
}
public object SyncRoot
{
get { return syncRoot; }
}
public bool IsSynchronized
{
get { return true; }
}
public int IndexOf(T item)
{
lock (syncRoot)
{
UpdateLists();
return underlyingList.IndexOf(item);
}
}
public void Insert(int index, T item)
{
lock (syncRoot)
{
UpdateLists();
underlyingList.Insert(index, item);
}
}
}
---- MainWindow.xaml.cs ----
public MainWindow()
{
InitializeComponent();
Console.WriteLine(@"standard List<T< - 10000 work items");
var list1 = new ConcurrentList<int>();
var start1 = DateTime.Now.Ticks;
DoWork(list1, 10000);
var end1 = DateTime.Now.Ticks;
var c1 = list1.Count; // accesses list
var cend1 = DateTime.Now.Ticks;
Console.WriteLine();
Console.WriteLine(@"Work Time: {0} - milliseconds", (end1 - start1)
/ TimeSpan.TicksPerMillisecond);
Console.WriteLine(@"Get Count Time: {0} - milliseconds",
(cend1 - end1)
/ TimeSpan.TicksPerMillisecond);
Console.WriteLine();
Console.WriteLine();
Console.WriteLine(@"ConcurrentList<T> - 10000 work items on single thread");
var list2 = new ConcurrentList<int>();
var start2 = DateTime.Now.Ticks;
DoWork(list2, 10000);
var end2 = DateTime.Now.Ticks;
var c2 = list2.Count; // accesses list, update performed
var cend2 = DateTime.Now.Ticks;
Console.WriteLine();
Console.WriteLine(@"Work Time: {0} - milliseconds", (end2 - start2)
/ TimeSpan.TicksPerMillisecond);
Console.WriteLine(@"Get Count Time: {0} - milliseconds",
(cend2 - end2)
/ TimeSpan.TicksPerMillisecond);
Console.WriteLine();
Console.WriteLine();
Console.WriteLine(@"ConcurrentList<T> - 10000 work items on 4 parallel tasks");
var list3 = new ConcurrentList<int>();
var start3 = DateTime.Now.Ticks;
var tasks3 = new[]
{
Task.Factory.StartNew(() => DoWork(list3,10000)),
Task.Factory.StartNew(() => DoWork(list3,10000)),
Task.Factory.StartNew(() => DoWork(list3,10000)),
Task.Factory.StartNew(() => DoWork(list3,10000)),
Task.Factory.StartNew(() => DoWork(list3,10000)),
Task.Factory.StartNew(() => DoWork(list3,10000)),
Task.Factory.StartNew(() => DoWork(list3,10000)),
Task.Factory.StartNew(() => DoWork(list3,10000)),
Task.Factory.StartNew(() => DoWork(list3,10000)),
Task.Factory.StartNew(() => DoWork(list3,10000)),
Task.Factory.StartNew(() => DoWork(list3,10000)),
Task.Factory.StartNew(() => DoWork(list3,10000)),
Task.Factory.StartNew(() => DoWork(list3,10000)),
Task.Factory.StartNew(() => DoWork(list3,10000)),
Task.Factory.StartNew(() => DoWork(list3,10000)),
Task.Factory.StartNew(() => DoWork(list3,10000)),
Task.Factory.StartNew(() => DoWork(list3,10000)),
Task.Factory.StartNew(() => DoWork(list3,10000)),
Task.Factory.StartNew(() => DoWork(list3,10000)),
Task.Factory.StartNew(() => DoWork(list3,10000)),
Task.Factory.StartNew(() => DoWork(list3,10000)),
Task.Factory.StartNew(() => DoWork(list3,10000)),
Task.Factory.StartNew(() => DoWork(list3,10000)),
Task.Factory.StartNew(() => DoWork(list3,10000)),
Task.Factory.StartNew(() => DoWork(list3,10000)),
Task.Factory.StartNew(() => DoWork(list3,10000)),
Task.Factory.StartNew(() => DoWork(list3,10000)),
Task.Factory.StartNew(() => DoWork(list3,10000)),
Task.Factory.StartNew(() => DoWork(list3,10000)),
Task.Factory.StartNew(() => DoWork(list3,10000)),
Task.Factory.StartNew(() => DoWork(list3,10000)),
Task.Factory.StartNew(() => DoWork(list3,10000)),
Task.Factory.StartNew(() => DoWork(list3,10000)),
Task.Factory.StartNew(() => DoWork(list3,10000)),
Task.Factory.StartNew(() => DoWork(list3,10000)),
Task.Factory.StartNew(() => DoWork(list3,10000)),
Task.Factory.StartNew(() => DoWork(list3,10000)),
Task.Factory.StartNew(() => DoWork(list3,10000)),
Task.Factory.StartNew(() => DoWork(list3,10000)),
Task.Factory.StartNew(() => DoWork(list3,10000)),
Task.Factory.StartNew(() => DoWork(list3,10000)),
Task.Factory.StartNew(() => DoWork(list3,10000)),
Task.Factory.StartNew(() => DoWork(list3,10000)),
Task.Factory.StartNew(() => DoWork(list3,10000)),
Task.Factory.StartNew(() => DoWork(list3,10000)),
Task.Factory.StartNew(() => DoWork(list3,10000)),
Task.Factory.StartNew(() => DoWork(list3,10000)),
};
Task.WaitAll(tasks3);
var end3 = DateTime.Now.Ticks;
var c3 = list3.Count; // accesses list, update performed
var cend3 = DateTime.Now.Ticks;
Console.WriteLine();
Console.WriteLine(@"Work Time: {0} - milliseconds", (end3 - start3)
/ TimeSpan.TicksPerMillisecond);
Console.WriteLine(@"Get Count Time: {0} - milliseconds",
(cend3 - end3)
/ TimeSpan.TicksPerMillisecond);
Console.WriteLine();
}
static void DoWork(ConcurrentList<int> list, int count)
{
for (var i = 0; i < count; i++)
{
var foo = list.Any(x => x == 2); //<--- throws Exception (Enumeration changed)
if (foo)
{
}
list.Add(Rnd.Next(int.MinValue, int.MaxValue));
// use spinwait to emulate work but avoiding
// context switching
//Thread.SpinWait(100000);
}
}
其实不是关于多线程的。
Any
的实现:
public static bool Any<TSource>(this IEnumerable<TSource> source) {
if (source == null) throw Error.ArgumentNull("source");
using (IEnumerator<TSource> e = source.GetEnumerator()) {
// Suppose that another thread added an item here
// And obviously you get an InvalidOperationException
if (e.MoveNext()) return true;
}
return false;
}
下面的代码将抛出同样的异常:
var collection = new List<int>() { 0 };
foreach (var c in collection) {
collection.Add(c);
}
为了使您的集合线程安全,您可以为每个GetEnumerator
调用返回原始集合副本的枚举器(这是System.Collections.Concurrent
集合的行为方式):
public IEnumerator<T> GetEnumerator()
{
lock (syncRoot)
{
UpdateLists();
return new List<T>(underlyingList).GetEnumerator();
}
}