实现并行.while循环中的Foreach

本文关键字:Foreach 循环 并行 while 实现 | 更新日期: 2023-09-27 18:18:18

我有一个场景,我需要运行并行。while循环中的Foreach。我需要从处理将如何发生的角度来理解这个实现的影响。我将有一个像这样的实现

 ConcurrentQueue<MyTable> queue = new ConcurrentQueue<MyTable>();

在这里,我最初在队列中添加了很多项目,但在执行时也可以在队列中添加更多项目。

while(true)
{
    Parallel.Foreach(queue, (myTable) => {some processing});
    Sleep(sometime);
}

每次一个项目将被取消排队,新的线程将产生与它一起工作,同时新的项目将被添加,我需要保持一个无限while循环。

现在,我需要理解,由于并发队列是线程安全的,我认为每个项目只会处理一次,尽管在foreach之上,但我不确定的是,会有多个线程的foreach本身将产生子线程或foreach的单个副本将在while循环内运行。我不知道foreach本身是如何实现的

实现并行.while循环中的Foreach

我有一个场景,我需要运行并行。在while循环内的Foreach

我不认为你知道。您希望并行处理新条目,但我认为这不是最好的方法。

我认为最好的方法是使用来自TPL数据流的ActionBlock。当没有项目要处理时,它不会浪费CPU或线程,如果您设置了它的MaxDegreeOfParallelism,它将并行处理项目:

ActionBlock<MyTable> actionBlock = new ActionBlock<MyTable>(
    myTable => /* some processing */,
    new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
    });

...
actionBlock.Post(someTable);

如果你不想或不能(只有。net 4.5)使用TPL数据流,另一个选择是使用单个Parallel.Foreach()(没有while)与BlockingCollectionGetConsumingPartitioner() (而不是 GetConsumingEnumerable() !)。

使用这个,当没有项目要处理时,Parallel.Foreach()线程将被阻塞,但也不会有任何处理延迟(像Sleep()造成的延迟):

BlockingCollection<MyTable> queue = new BlockingCollection<MyTable>();
...
Parallel.ForEach(
    queue.GetConsumingPartitioner(), myTable => /* some processing */);
...
queue.Add(someTable);

我认为每个项目将只处理一次,尽管上面列出了每个项目,但我不确定

这就是你应该使用上述选项之一的原因之一,因为它们意味着你不需要了解它们如何工作的太多细节,它们就是工作。