List< T>.RemoveAll作为并行

本文关键字:并行 RemoveAll List | 更新日期: 2023-09-27 18:12:44

我想知道做toProcess.RemoveAll的替代方案,但并行。今天我的代码像我的示例一样工作得很好,但是是顺序的,我希望是并行的。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace ParallelTest
{
    using System.Threading;
    using System.Threading.Tasks;
    class Program
    {
        static void Main(string[] args)
        {
            List<VerifySomethingFromInternet> foo = new List<VerifySomethingFromInternet>();
            foo.Add(new VerifySomethingFromInternet(@"id1", true));
            foo.Add(new VerifySomethingFromInternet(@"id2", false));
            foo.Add(new VerifySomethingFromInternet(@"id3", true));
            foo.Add(new VerifySomethingFromInternet(@"id4", false));
            foo.Add(new VerifySomethingFromInternet(@"id5", true));
            foo.Add(new VerifySomethingFromInternet(@"id6", false));
            DoSomethingFromIntert bar = new DoSomethingFromIntert();
            bar.DoesWork(foo);
            Console.ReadLine();
        }
    }
    public class DoSomethingFromIntert
    {
        bool RemoveIFTrueFromInternet(VerifySomethingFromInternet vsfi)
        {
            Console.WriteLine(String.Format("Identification : {0} - Thread : {1}", vsfi.Identification, Thread.CurrentThread.ManagedThreadId));
            // Do some blocking work at internet
            return vsfi.IsRemovable;
        }
        public void DoesWork(List<VerifySomethingFromInternet> toProcess)
        {
            Console.WriteLine(String.Format("total : {0}", toProcess.Count));
            //Remove all true return
            toProcess.RemoveAll(f => this.RemoveIFTrueFromInternet(f));
            Console.WriteLine(String.Format("total : {0}", toProcess.Count));
        }
    }
    public class VerifySomethingFromInternet
    {
        public VerifySomethingFromInternet(string id, bool remove)
        {
            this.Identification = id;
            this.IsRemovable = remove;
        }
        public string Identification { get; set; }
        public bool IsRemovable { get; set; }
    }
}

List< T>.RemoveAll作为并行

var newList = toProcess.AsParallel ()
               .Where (f => !this.RemoveIFTrueFromInternet(f))
               .ToList ();
toProcess = newList;

可能这回答了你的问题,但我不确定它是否真的更快。试着衡量一下。

注意,这可能会改变列表中元素的顺序。如果你关心顺序,在AsParallel后面加上AsOrdered。(感谢weston的[隐式]提示)。

List<T>不是线程安全的,所以没有办法与这种类型的列表并行。

你可以使用线程安全的ConcurrentBag代替,但显然没有RemoveAll方法。

您还可以将列表转换为数组,编辑该数组,并将其再次传递给list。

我试着重构一下你的代码

我使用BlockingCollection来实现一个生产者消费者场景

这不是并行删除,但它可以通过并行处理来解决你的问题,试试吧,你可能会喜欢它

class Program
{
    static void Main(string[] args)
    {
        DoSomethingFromIntert bar = new DoSomethingFromIntert();  
        bar.Verify(@"id1", true);
        bar.Verify(@"id2", false);
        bar.Verify(@"id3", true);
        bar.Verify(@"id4", false);
        bar.Verify(@"id5", true);
        bar.Verify(@"id6", false);
        bar.Complete();
        Console.ReadLine();
    }
}
public class DoSomethingFromIntert
{
    BlockingCollection<VerifySomethingFromInternet> toProcess = new BlockingCollection<VerifySomethingFromInternet>();
    ConcurrentBag<VerifySomethingFromInternet> workinglist = new ConcurrentBag<VerifySomethingFromInternet>();
    public DoSomethingFromIntert()
    {
        //init four consumers you may choose as many as you want
        ThreadPool.QueueUserWorkItem(DoesWork);
        ThreadPool.QueueUserWorkItem(DoesWork);
        ThreadPool.QueueUserWorkItem(DoesWork);
        ThreadPool.QueueUserWorkItem(DoesWork);
    }
    public void Verify(string param, bool flag)
    {
        //add to the processing list
        toProcess.TryAdd(new VerifySomethingFromInternet(param, flag));
    }
    public void Complete()
    {
        //mark producer as complete and let the threads exit when finished verifying
        toProcess.CompleteAdding();
    }
    bool RemoveIFTrueFromInternet(VerifySomethingFromInternet vsfi)
    {
        Console.WriteLine(String.Format("Identification : {0} - Thread : {1}", vsfi.Identification, Thread.CurrentThread.ManagedThreadId));
        // Do some blocking work at internet
        return vsfi.IsRemovable;
    }
    private void DoesWork(object state)
    {
        Console.WriteLine(String.Format("total : {0}", toProcess.Count));
        foreach (var item in toProcess.GetConsumingEnumerable())
        {
            //do work
            if (!RemoveIFTrueFromInternet(item))
            {
                //add to list if working
                workinglist.TryAdd(item);
            }
            //no need to remove as it is removed from the list automatically
        }
        //this line will only reach after toProcess.CompleteAdding() and when items are consumed(verified)
        Console.WriteLine(String.Format("total : {0}", toProcess.Count));
    }
}

简而言之,它将在您添加条目时立即开始验证条目,并将成功的条目保存在单独的列表中

编辑

因为默认情况下GetConsumingEnumerable()的foreach循环不会结束,它会一直等待下一个元素,直到CompleteAdding()被调用。因此,我在包装器类中添加了Complete()方法,以便在我们推送所有元素后完成验证循环。

这个想法是继续添加验证元素到类中,让消费者循环并行地验证它们中的每一个,一旦完成,所有的元素都会调用Complete()来知道消费者没有更多的元素要添加,这样他们就可以在列表为空时终止foreach循环。

在您的代码中,元素的删除不是性能的实际问题,而是验证过程的同步循环,如果热点。从列表a中删除只需要花费几毫秒的时间,但是代码中昂贵的部分是blocking work at internet,所以如果我们可以使其并行,我们就可以节省一些宝贵的时间。

注意初始化的消费者线程的数量,但是我使用了线程池,但如果过度使用仍然可能影响性能。因此,根据机器的性能来决定一个数字。核心/处理器数量

关于BlockingCollection的更多信息