如何防止 Parallel.ForEach 循环在运行时更改任务数

本文关键字:任务 运行时 何防止 Parallel ForEach 循环 | 更新日期: 2023-09-27 18:36:38

我正在使用Parallel.ForEach循环来做一些工作,我用这样的localInit初始化它:

localInit: () => new
{
    foo = new Foo(),
    bars = CreateBars(),
}

根据MSDN:

localInit,或初始化线程局部变量的函数。 对于其中 执行 Parallel.ForEach 操作。我们的示例初始化 线程局部变量为零。

所以我尝试像这样使用它,但我观察到循环不断杀死并创建新任务,导致频繁调用localInit。这是我的选择适得其反,不能按预期工作。

我认为当Parallel.ForEach创建例如四个分区时,它会使它们保持活动状态,直到它迭代所有项目,但它没有。它呼唤localFinallylocalInit几个百年,为一个有几千件物品的收藏。怎么会这样?

可以以某种方式防止这种行为吗?我真的希望节省一些资源,但它并没有真正让我。


循环如下所示:

var parallelLoopResult = Parallel.ForEach
(
    source: items,
    parallelOptions: parallelOptions,
    localInit: () => new
    {
        foo = new Foo(),
        bars = CreateBars(),
    },
    body: (item, loopState, i, local) =>
    {
        parallelOptions.CancellationToken.ThrowIfCancellationRequested();
        var results = local.bars.Select(x => ...).ToList().
        ....
        return local;
    },
    localFinally: local =>
    {
        local.foo.Dispose();
        lock (aggregateLock)
        {
            ... process transformed bars
        }
    }
);

并行选项:

var parallelOptions = new ParallelOptions
{
    CancellationToken = cancellationTokenSource.Token,
#if DEBUG
    MaxDegreeOfParallelism = 1
    //MaxDegreeOfParallelism = Environment.ProcessorCount
#else
    MaxDegreeOfParallelism = Environment.ProcessorCount
#endif
};

如何防止 Parallel.ForEach 循环在运行时更改任务数

如果我正确理解代码,Parallel.ForEach()每隔几百毫秒重新启动一次Task。这意味着,如果每次迭代都是实质性的(通常应该是这样),您将获得大量Task,从而获得大量对localInitlocalFinally的调用。这样做的原因是同一进程中也使用相同的ThreadPool的其他代码的公平性。

我认为没有办法改变Parallel.ForEach()的这种行为.我认为解决这个问题的一个好方法是编写自己的简单版本Parallel.ForEach()。考虑到您可以利用Partitioner<T>并根据您需要的Parallel.ForEach()功能,它可能相对简单。例如,类似这样的内容:

public static void MyParallelForEach<TSource, TLocal>(
    IEnumerable<TSource> source, int degreeOfParallelism,
    Func<TLocal> localInit, Func<TSource, TLocal, TLocal> body, Action<TLocal> localFinally)
{
    var partitionerSource = Partitioner.Create(source).GetDynamicPartitions();
    Action taskAction = () =>
    {
        var localState = localInit();
        foreach (var item in partitionerSource)
        {
            localState = body(item, localState);
        }
        localFinally(localState);
    };
    var tasks = new Task[degreeOfParallelism - 1];
    for (int i = 0; i < degreeOfParallelism - 1; i++)
    {
        tasks[i] = Task.Run(taskAction);
    }
    taskAction();
    Task.WaitAll(tasks);
}

每个线程执行柱只创建一次。但是你知道做了多少次并行执行吗?并行执行引擎可以自行决定启动任意数量的并行执行。

如果要限制并行执行,请使用 MaxDegreeOfParallelism 属性。这将对一次创建的柱数设置上限。它仍然不会控制创建的总条形图,而且总条形图可能比您现在预期的要少。

如果要进行显式控制,请手动创建任务。

此重载不是唯一的重载,因此您可以尝试以下操作:

var bars = CreateBars();
Parallel.Foreach(bars, b => { /* your action here */};

但是,如果您确实想为每个线程创建 bars 的副本,则可以使用 LINQ 中的一些复制方法(假设您的 bar 是 IEnumerable<T> 变量):

var bars = CreateBars();
localInit: () => new
{
    foo = new Foo(),
    bars = new List<IBar>(bars),
}