ConcurrentList -线程安全未给出

本文关键字:安全 线程 ConcurrentList | 更新日期: 2023-09-27 17:53:45

我看了一下ConcurrentList的实现,发现了这个:http://deanchalk.com/2010/10/c-fast-parallel-concurrentlistt-implementation/

代码在实现中工作良好,您可以在给定的网页上找到,但是当我更改DoWork方法时:

原始:

static void DoWork(ICollection<int> list, int count)
{
    for (var i = 0; i < count; i++)
    {
        list.Add(i);
        // use spinwait to emulate work but avoiding 
        // context switching
        Thread.SpinWait(100000);
    }
}

更改:

static void DoWork(ICollection<int> list, int count)
{
    for (var i = 0; i < count; i++)
    {
        var foo = list.Any(x => x == 2); //<-- throws exception.. (Enumeration changed)
        if (foo == true)
        {
            //do something...
        }
        list.Add(i);
    }
}

这个更改最终会中断执行,告诉我枚举已被修改。

我如何使这段代码工作?

---- Concurrent List ----

public class ConcurrentList<T> : IList<T>, IList
{
    private readonly List<T> underlyingList = new List<T>();
    private readonly object syncRoot = new object();
    private readonly ConcurrentQueue<T> underlyingQueue;
    private bool requiresSync;
    private bool isDirty;
    public ConcurrentList()
    {
        underlyingQueue = new ConcurrentQueue<T>();
    }
    public ConcurrentList(IEnumerable<T> items)
    {
        underlyingQueue = new ConcurrentQueue<T>(items);
    }
    private void UpdateLists()
    {
        if (!isDirty)
            return;
        lock (syncRoot)
        {
            requiresSync = true;
            T temp;
            while (underlyingQueue.TryDequeue(out temp))
                underlyingList.Add(temp);
            requiresSync = false;
        }
    }
    public IEnumerator<T> GetEnumerator()
    {
        lock (syncRoot)
        {
            UpdateLists();
            return underlyingList.GetEnumerator();
        }
    }
    IEnumerator IEnumerable.GetEnumerator()
    {
        return GetEnumerator();
    }
    public void Add(T item)
    {
        if (requiresSync)
            lock (syncRoot)
                underlyingQueue.Enqueue(item);
        else
            underlyingQueue.Enqueue(item);
        isDirty = true;
    }
    public int Add(object value)
    {
        if (requiresSync)
            lock (syncRoot)
                underlyingQueue.Enqueue((T)value);
        else
            underlyingQueue.Enqueue((T)value);
        isDirty = true;
        lock (syncRoot)
        {
            UpdateLists();
            return underlyingList.IndexOf((T)value);
        }
    }
    public bool Contains(object value)
    {
        lock (syncRoot)
        {
            UpdateLists();
            return underlyingList.Contains((T)value);
        }
    }
    public int IndexOf(object value)
    {
        lock (syncRoot)
        {
            UpdateLists();
            return underlyingList.IndexOf((T)value);
        }
    }
    public void Insert(int index, object value)
    {
        lock (syncRoot)
        {
            UpdateLists();
            underlyingList.Insert(index, (T)value);
        }
    }
    public void Remove(object value)
    {
        lock (syncRoot)
        {
            UpdateLists();
            underlyingList.Remove((T)value);
        }
    }
    public void RemoveAt(int index)
    {
        lock (syncRoot)
        {
            UpdateLists();
            underlyingList.RemoveAt(index);
        }
    }
    T IList<T>.this[int index]
    {
        get
        {
            lock (syncRoot)
            {
                UpdateLists();
                return underlyingList[index];
            }
        }
        set
        {
            lock (syncRoot)
            {
                UpdateLists();
                underlyingList[index] = value;
            }
        }
    }
    object IList.this[int index]
    {
        get { return ((IList)this)[index]; }
        set { ((IList)this)[index] = (T)value; }
    }
    public bool IsReadOnly
    {
        get { return false; }
    }
    public bool IsFixedSize
    {
        get { return false; }
    }
    public void Clear()
    {
        lock (syncRoot)
        {
            UpdateLists();
            underlyingList.Clear();
        }
    }
    public bool Contains(T item)
    {
        lock (syncRoot)
        {
            UpdateLists();
            return underlyingList.Contains(item);
        }
    }
    public void CopyTo(T[] array, int arrayIndex)
    {
        lock (syncRoot)
        {
            UpdateLists();
            underlyingList.CopyTo(array, arrayIndex);
        }
    }
    public bool Remove(T item)
    {
        lock (syncRoot)
        {
            UpdateLists();
            return underlyingList.Remove(item);
        }
    }
    public void CopyTo(Array array, int index)
    {
        lock (syncRoot)
        {
            UpdateLists();
            underlyingList.CopyTo((T[])array, index);
        }
    }
    public int Count
    {
        get
        {
            lock (syncRoot)
            {
                UpdateLists();
                return underlyingList.Count;
            }
        }
    }
    public object SyncRoot
    {
        get { return syncRoot; }
    }
    public bool IsSynchronized
    {
        get { return true; }
    }
    public int IndexOf(T item)
    {
        lock (syncRoot)
        {
            UpdateLists();
            return underlyingList.IndexOf(item);
        }
    }
    public void Insert(int index, T item)
    {
        lock (syncRoot)
        {
            UpdateLists();
            underlyingList.Insert(index, item);
        }
    }
}

---- MainWindow.xaml.cs ----

public MainWindow()
{
    InitializeComponent();
    Console.WriteLine(@"standard List<T< - 10000 work items");
    var list1 = new ConcurrentList<int>();
    var start1 = DateTime.Now.Ticks;
    DoWork(list1, 10000);
    var end1 = DateTime.Now.Ticks;
    var c1 = list1.Count; // accesses list
    var cend1 = DateTime.Now.Ticks;
    Console.WriteLine();
    Console.WriteLine(@"Work Time: {0} - milliseconds", (end1 - start1)
        / TimeSpan.TicksPerMillisecond);
    Console.WriteLine(@"Get Count Time: {0} - milliseconds",
        (cend1 - end1)
        / TimeSpan.TicksPerMillisecond);
    Console.WriteLine();
    Console.WriteLine();
    Console.WriteLine(@"ConcurrentList<T> - 10000 work items on single thread");
    var list2 = new ConcurrentList<int>();
    var start2 = DateTime.Now.Ticks;
    DoWork(list2, 10000);
    var end2 = DateTime.Now.Ticks;
    var c2 = list2.Count; // accesses list, update performed
    var cend2 = DateTime.Now.Ticks;
    Console.WriteLine();
    Console.WriteLine(@"Work Time: {0} - milliseconds", (end2 - start2)
        / TimeSpan.TicksPerMillisecond);
    Console.WriteLine(@"Get Count Time: {0} - milliseconds",
        (cend2 - end2)
        / TimeSpan.TicksPerMillisecond);
    Console.WriteLine();
    Console.WriteLine();
    Console.WriteLine(@"ConcurrentList<T> - 10000 work items on 4 parallel tasks");
    var list3 = new ConcurrentList<int>();
    var start3 = DateTime.Now.Ticks;
    var tasks3 = new[]
        {
            Task.Factory.StartNew(() => DoWork(list3,10000)),
            Task.Factory.StartNew(() => DoWork(list3,10000)),
            Task.Factory.StartNew(() => DoWork(list3,10000)),
            Task.Factory.StartNew(() => DoWork(list3,10000)),
            Task.Factory.StartNew(() => DoWork(list3,10000)),
            Task.Factory.StartNew(() => DoWork(list3,10000)),
            Task.Factory.StartNew(() => DoWork(list3,10000)),
            Task.Factory.StartNew(() => DoWork(list3,10000)),
            Task.Factory.StartNew(() => DoWork(list3,10000)),
            Task.Factory.StartNew(() => DoWork(list3,10000)),
            Task.Factory.StartNew(() => DoWork(list3,10000)),
            Task.Factory.StartNew(() => DoWork(list3,10000)),
            Task.Factory.StartNew(() => DoWork(list3,10000)),
            Task.Factory.StartNew(() => DoWork(list3,10000)),
            Task.Factory.StartNew(() => DoWork(list3,10000)),
            Task.Factory.StartNew(() => DoWork(list3,10000)),
            Task.Factory.StartNew(() => DoWork(list3,10000)),
            Task.Factory.StartNew(() => DoWork(list3,10000)),
            Task.Factory.StartNew(() => DoWork(list3,10000)),
            Task.Factory.StartNew(() => DoWork(list3,10000)),
            Task.Factory.StartNew(() => DoWork(list3,10000)),
            Task.Factory.StartNew(() => DoWork(list3,10000)),
            Task.Factory.StartNew(() => DoWork(list3,10000)),
            Task.Factory.StartNew(() => DoWork(list3,10000)),
            Task.Factory.StartNew(() => DoWork(list3,10000)),
            Task.Factory.StartNew(() => DoWork(list3,10000)),
            Task.Factory.StartNew(() => DoWork(list3,10000)),
            Task.Factory.StartNew(() => DoWork(list3,10000)),
            Task.Factory.StartNew(() => DoWork(list3,10000)),
            Task.Factory.StartNew(() => DoWork(list3,10000)),
            Task.Factory.StartNew(() => DoWork(list3,10000)),
            Task.Factory.StartNew(() => DoWork(list3,10000)),
            Task.Factory.StartNew(() => DoWork(list3,10000)),
            Task.Factory.StartNew(() => DoWork(list3,10000)),
            Task.Factory.StartNew(() => DoWork(list3,10000)),
            Task.Factory.StartNew(() => DoWork(list3,10000)),
            Task.Factory.StartNew(() => DoWork(list3,10000)),
            Task.Factory.StartNew(() => DoWork(list3,10000)),
            Task.Factory.StartNew(() => DoWork(list3,10000)),
            Task.Factory.StartNew(() => DoWork(list3,10000)),
            Task.Factory.StartNew(() => DoWork(list3,10000)),
            Task.Factory.StartNew(() => DoWork(list3,10000)),
            Task.Factory.StartNew(() => DoWork(list3,10000)),
            Task.Factory.StartNew(() => DoWork(list3,10000)),
            Task.Factory.StartNew(() => DoWork(list3,10000)),
            Task.Factory.StartNew(() => DoWork(list3,10000)),
            Task.Factory.StartNew(() => DoWork(list3,10000)),
        };
    Task.WaitAll(tasks3);
    var end3 = DateTime.Now.Ticks;
    var c3 = list3.Count; // accesses list, update performed
    var cend3 = DateTime.Now.Ticks;
    Console.WriteLine();
    Console.WriteLine(@"Work Time: {0} - milliseconds", (end3 - start3)
        / TimeSpan.TicksPerMillisecond);
    Console.WriteLine(@"Get Count Time: {0} - milliseconds",
            (cend3 - end3)
        / TimeSpan.TicksPerMillisecond);
    Console.WriteLine();
}
static void DoWork(ConcurrentList<int> list, int count)
{
    for (var i = 0; i < count; i++)
    {
        var foo = list.Any(x => x == 2); //<--- throws Exception (Enumeration changed)
        if (foo)
        {
        }
        list.Add(Rnd.Next(int.MinValue, int.MaxValue));
        // use spinwait to emulate work but avoiding 
        // context switching
        //Thread.SpinWait(100000);
    }
}

ConcurrentList -线程安全未给出

其实不是关于多线程的。

Any的实现:

public static bool Any<TSource>(this IEnumerable<TSource> source) {
        if (source == null) throw Error.ArgumentNull("source");
        using (IEnumerator<TSource> e = source.GetEnumerator()) {
            // Suppose that another thread added an item here
            // And obviously you get an InvalidOperationException
            if (e.MoveNext()) return true;
        }
        return false;
}
下面的代码将抛出同样的异常:
var collection = new List<int>() { 0 };
foreach (var c in collection) {
   collection.Add(c);
}

为了使您的集合线程安全,您可以为每个GetEnumerator调用返回原始集合副本的枚举器(这是System.Collections.Concurrent集合的行为方式):

public IEnumerator<T> GetEnumerator()
{
    lock (syncRoot)
    {
        UpdateLists();
        return new List<T>(underlyingList).GetEnumerator();
    }
}