List< T>.RemoveAll作为并行
本文关键字:并行 RemoveAll List | 更新日期: 2023-09-27 18:12:44
我想知道做toProcess.RemoveAll
的替代方案,但并行。今天我的代码像我的示例一样工作得很好,但是是顺序的,我希望是并行的。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace ParallelTest
{
using System.Threading;
using System.Threading.Tasks;
class Program
{
static void Main(string[] args)
{
List<VerifySomethingFromInternet> foo = new List<VerifySomethingFromInternet>();
foo.Add(new VerifySomethingFromInternet(@"id1", true));
foo.Add(new VerifySomethingFromInternet(@"id2", false));
foo.Add(new VerifySomethingFromInternet(@"id3", true));
foo.Add(new VerifySomethingFromInternet(@"id4", false));
foo.Add(new VerifySomethingFromInternet(@"id5", true));
foo.Add(new VerifySomethingFromInternet(@"id6", false));
DoSomethingFromIntert bar = new DoSomethingFromIntert();
bar.DoesWork(foo);
Console.ReadLine();
}
}
public class DoSomethingFromIntert
{
bool RemoveIFTrueFromInternet(VerifySomethingFromInternet vsfi)
{
Console.WriteLine(String.Format("Identification : {0} - Thread : {1}", vsfi.Identification, Thread.CurrentThread.ManagedThreadId));
// Do some blocking work at internet
return vsfi.IsRemovable;
}
public void DoesWork(List<VerifySomethingFromInternet> toProcess)
{
Console.WriteLine(String.Format("total : {0}", toProcess.Count));
//Remove all true return
toProcess.RemoveAll(f => this.RemoveIFTrueFromInternet(f));
Console.WriteLine(String.Format("total : {0}", toProcess.Count));
}
}
public class VerifySomethingFromInternet
{
public VerifySomethingFromInternet(string id, bool remove)
{
this.Identification = id;
this.IsRemovable = remove;
}
public string Identification { get; set; }
public bool IsRemovable { get; set; }
}
}
var newList = toProcess.AsParallel ()
.Where (f => !this.RemoveIFTrueFromInternet(f))
.ToList ();
toProcess = newList;
可能这回答了你的问题,但我不确定它是否真的更快。试着衡量一下。
注意,这可能会改变列表中元素的顺序。如果你关心顺序,在AsParallel
后面加上AsOrdered
。(感谢weston的[隐式]提示)。
List<T>
不是线程安全的,所以没有办法与这种类型的列表并行。
你可以使用线程安全的ConcurrentBag
代替,但显然没有RemoveAll
方法。
您还可以将列表转换为数组,编辑该数组,并将其再次传递给list。
我试着重构一下你的代码
我使用BlockingCollection
来实现一个生产者消费者场景
这不是并行删除,但它可以通过并行处理来解决你的问题,试试吧,你可能会喜欢它
class Program
{
static void Main(string[] args)
{
DoSomethingFromIntert bar = new DoSomethingFromIntert();
bar.Verify(@"id1", true);
bar.Verify(@"id2", false);
bar.Verify(@"id3", true);
bar.Verify(@"id4", false);
bar.Verify(@"id5", true);
bar.Verify(@"id6", false);
bar.Complete();
Console.ReadLine();
}
}
public class DoSomethingFromIntert
{
BlockingCollection<VerifySomethingFromInternet> toProcess = new BlockingCollection<VerifySomethingFromInternet>();
ConcurrentBag<VerifySomethingFromInternet> workinglist = new ConcurrentBag<VerifySomethingFromInternet>();
public DoSomethingFromIntert()
{
//init four consumers you may choose as many as you want
ThreadPool.QueueUserWorkItem(DoesWork);
ThreadPool.QueueUserWorkItem(DoesWork);
ThreadPool.QueueUserWorkItem(DoesWork);
ThreadPool.QueueUserWorkItem(DoesWork);
}
public void Verify(string param, bool flag)
{
//add to the processing list
toProcess.TryAdd(new VerifySomethingFromInternet(param, flag));
}
public void Complete()
{
//mark producer as complete and let the threads exit when finished verifying
toProcess.CompleteAdding();
}
bool RemoveIFTrueFromInternet(VerifySomethingFromInternet vsfi)
{
Console.WriteLine(String.Format("Identification : {0} - Thread : {1}", vsfi.Identification, Thread.CurrentThread.ManagedThreadId));
// Do some blocking work at internet
return vsfi.IsRemovable;
}
private void DoesWork(object state)
{
Console.WriteLine(String.Format("total : {0}", toProcess.Count));
foreach (var item in toProcess.GetConsumingEnumerable())
{
//do work
if (!RemoveIFTrueFromInternet(item))
{
//add to list if working
workinglist.TryAdd(item);
}
//no need to remove as it is removed from the list automatically
}
//this line will only reach after toProcess.CompleteAdding() and when items are consumed(verified)
Console.WriteLine(String.Format("total : {0}", toProcess.Count));
}
}
简而言之,它将在您添加条目时立即开始验证条目,并将成功的条目保存在单独的列表中
编辑
因为默认情况下GetConsumingEnumerable()
的foreach循环不会结束,它会一直等待下一个元素,直到CompleteAdding()
被调用。因此,我在包装器类中添加了Complete()
方法,以便在我们推送所有元素后完成验证循环。
这个想法是继续添加验证元素到类中,让消费者循环并行地验证它们中的每一个,一旦完成,所有的元素都会调用Complete()来知道消费者没有更多的元素要添加,这样他们就可以在列表为空时终止foreach循环。
在您的代码中,元素的删除不是性能的实际问题,而是验证过程的同步循环,如果热点。从列表a中删除只需要花费几毫秒的时间,但是代码中昂贵的部分是blocking work at internet
,所以如果我们可以使其并行,我们就可以节省一些宝贵的时间。
关于BlockingCollection的更多信息