包含线程的处理列表

本文关键字:列表 处理 线程 包含 | 更新日期: 2023-09-27 17:56:54

我想处理一个包含 5000 个项目的列表。对于每个项目,该过程可能非常快(1秒)或需要很长时间(>1分钟)。但我想以最快的方式处理此列表。

我无法将这 5000 个项目放在 .NET ThreadPool 中,而且我需要知道何时处理所有项目,所以我正在考虑拥有特定数量的线程并执行以下操作:

foreach(var item in items)
{
    // wait for a Thread to be available
    // give the item to process to the Thread
}

但是在 C# 中执行此操作的最简单方法是什么?我应该使用线程,还是可以使用一些更高级别的类?

包含线程的处理列表

我会从Parallel.ForEach开始,衡量你的表现。这是一种简单而强大的方法,对于通用调度程序来说,调度工作做得相当不错。

Parallel.ForEach(items, i => { /* your code here */ });

我无法将这 5000 个项目放在 .NET 线程池中

你也不想。创建线程的成本相对较高。上下文切换需要时间。如果你说 8 个内核处理 5000 个线程,那么执行时间的一个有意义的部分将是上下文切换。

要进行并行处理,这是要使用的结构

Parallel.ForEach(items, (item) => 
{
       ....
}

如果你不想使线程池过载,你可以使用ParallelOptions

var po = new ParallelOptions
{
     MaxDegreeOfParallelism = 5
}
Parallel.ForEach(items, po,(item) => 
{
           ....
}

我同意推荐Parallel.ForEach的答案。在不知道所有细节(比如循环中发生了什么)的情况下,我不能说 100%。只要循环中的迭代没有做任何相互冲突的事情(比如对一些非线程安全的其他对象的并发操作),那么它应该没问题。

您在评论中提到它抛出了一个异常。这可能是一个问题,因为如果一次迭代引发异常,则循环将终止,使您的任务仅部分完成。

若要避免这种情况,请在循环的每次迭代中处理异常。例如

var exceptions = new ConcurrentQueue<Exception>();
Parallel.ForEach(items, i => 
{ 
    try
    {
        //Your code to do whatever
    }
    catch(Exception ex)
    {
        exceptions.Enqueue(ex);
    }
});

通过使用ConcurrentQueue任何迭代都可以安全地添加自己的异常。完成后,您将有一个例外列表。现在,您可以决定如何处理它们。您可以抛出一个新的异常:

if (exceptions.Count > 0) throw new AggregateException(exceptions);

或者,如果有一些东西可以唯一标识您可以执行的每个item(例如)

var exceptions = new ConcurrentDictionary<Guid, Exception>();

然后当引发异常时,

exceptions.TryAdd(item.Id, ex); //making up the Id property

现在,您确切地知道哪些项目成功,哪些项目失败。