`Parallel.ForEach `按照定义的顺序执行最后一步

本文关键字:顺序 执行 最后一步 Parallel ForEach 定义 | 更新日期: 2023-09-27 18:27:29

我正在寻找一种"简洁"而有效的方法来实现长步骤1(可以并行)和需要按原始顺序的步骤2的组合(如果可能的话,最大限度地减少RAM中保存的第一步骤的数据量),同时允许第二步在第一个对象的步骤1的数据可用时立即开始,以及用于进一步数据的步骤2。

为了使其更加清晰,我需要压缩大量图像(缓慢-步骤1),然后通过网络连接按顺序发送每个图像(步骤2)。在任何阶段限制RAM中准备好的压缩数据块的数量也很重要,因此,例如,如果发送1000个图像,我希望将"已完成"但未发送的图像的数量限制为(例如)使用的线程/处理器的数量。

我已经做了一个"手写"版本,使用了一组Task对象,但看起来很混乱,我相信其他人一定有类似的需求,所以有更"标准"的方法吗?理想情况下,我希望Parallel.ForEach上有一个变体,有两个委托-一个用于步骤1,另一个用于第2步,我希望其中一个标准重写(如包含"localFinal"参数的重写)可能会有所帮助,但事实证明,这些最后阶段是"每个线程",而不是"每个委托"。

有人能给我指出一种现有的实现这一目标的巧妙方法吗?

`Parallel.ForEach `按照定义的顺序执行最后一步

您可以使用Plinq(与WithDegreeOfParallelism()一起在第一阶段限制并发)的组合,以及用于已完成块的BlockingCollection。此外,请注意,它使用AsOrdered()来保留原始顺序。

下面的示例演示。对于您的实际应用程序,您应该将此处显示的int工作项替换为您的工作项类型——文件名或包含与每个工作项相关信息的类。

using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace Demo
{
    static class Program
    {
        static void Main()
        {
            int maxThreads = 4;
            int maxOutputQueueSize = 10;
            var workItems = Enumerable.Range(1, 100); // Pretend these are your files
            var outputQueue = new BlockingCollection<int>(maxOutputQueueSize);
            var worker = Task.Run(() => output(outputQueue));
            var parallelWorkItems = 
                workItems
                .AsParallel()
                .AsOrdered()
                .WithDegreeOfParallelism(maxThreads)
                .WithMergeOptions(ParallelMergeOptions.NotBuffered)
                .Select(process);
            foreach (var item in parallelWorkItems)
                outputQueue.Add(item);
            outputQueue.CompleteAdding();
            worker.Wait();
            Console.WriteLine("Done.");
        }
        static int process(int value) // Pretend that this compresses the data.
        {
            Console.WriteLine($"Worker {Thread.CurrentThread.ManagedThreadId} is processing {value}");
            Thread.Sleep(250);  // Simulate slow operation.
            return value; // Return updated work item.
        }
        static void output(BlockingCollection<int> queue)
        {
            foreach (var item in queue.GetConsumingEnumerable())
                Console.WriteLine($"Output is processing {item}");
            Console.WriteLine("Finished outputting.");
        }
    }
}

注意如何限制输入队列处理(通过WithDegreeOfParallelism)和输出队列的大小(使用maxOutputQueueSize参数)。

或者,如果您使用的是.Net 4.5或更高版本,您可以查看TPL数据流库,它对这类东西有很多支持。如果可以的话,我建议你使用它——但这里的答案有点太多了。