Parallel.ForEach missing items
本文关键字:items missing ForEach Parallel | 更新日期: 2023-09-27 18:14:25
我有以下代码:
HttpContext httpContext = HttpContext.Current;
RequestContext currentContext = RequestContextManager.CurrentContext;
ILifetimeScope currentSessionScope = PlatformContext.LifeTimeScope;
ConcurrentQueue<Exception> exceptions = new ConcurrentQueue<Exception>();
ConcurrentBag<ParallelCalculateObj> forEachResult = new ConcurrentBag<ParallelCalculateObj>();
ConcurrentBag<ParallelCalculateObj> testForEachPassResult = new ConcurrentBag<ParallelCalculateObj>();
ParallelLoopResult loopResult = Parallel.ForEach(applications, () =>
{
HttpContext.Current = httpContext;
RequestContextManager.SetCustomCurrentContext(currentContext);
PlatformContext.LifeTimeScope = currentSessionScope;
return new ParallelCalculateObj();
}, (application, pls, localObj) =>
{
try
{
// some code
}
catch (Exception e)
{
exceptions.Enqueue(e);
}
testForEachPassResult.Add(localObj);
return localObj;
}, forEachResult.Add);
其中CCD_ 1。执行以上代码后,我得到了forEachResult.Count = 2
和testForEachPassResult.Count = 3
为什么forEachResult集合不包含所有元素?没有例外,ParallelLoopResult.IsCompleted = true
。
有一件事可能有助于解决我的问题,那就是这三个项目是在两个线程下运行的:
- Item01->Thread.CurrentThread.ManagedThreadId为14
- Item02->Thread.CurrentThread.ManagedThreadId为10
- Item03->Thread.CurrentThread.ManagedThreadId为14
我认为您使用Parallel.ForEach
的方式不对。
您正在使用具有本地状态的重载。这个局部状态对于分区/线程是唯一的,但并不是每个迭代都有唯一的局部状态。
考虑将输入列表划分为N分区。则存在N局部状态。作为最后一步,您必须将这些N局部状态组合为最终值。通常,N将小于列表中的项目数,除非您使用更具体的重载之一,否则TPL将决定列表的分区方式。
由于您显然想用每次迭代的结果来填充某个列表,因此您的本地状态也应该是一个包含该特定分区的每次迭代结果的列表。对于最后的操作,您将所有列表合并为一个单独的列表:
Parallel.ForEach(
applications,
() => new List<ParallelCalculateObj>(),
(application, pls, localObj) =>
{
// do something
var obj = new ParallelCalculateObj { /* data of the iteration */ };
localObj.Add(obj);
return localObj;
},
localObj =>
{
foreach (var result in localObj)
{
forEachResult.Add(result);
}
});
请注意,如果您这样做,那么forEachResult
中的值顺序将与applications
中的项目顺序不对应。如果您想要,那么您必须使用ParallelLoopState
类的索引。
尝试
lock(testForEachPassResult){
testForEachPassResult.Add(localObj);
}
添加元素时,很可能没有列表的最新状态。请记住,列表可以同时从另一个线程添加另一个元素。因此,当它发生更改时,您可以将新项目添加到旧版本的testForEachPassResult中。
如果锁定该列表,则所有其他线程都将等待该列表解锁。
有三种方法,其中两种已经提到。
(1( 每个循环创建一个集合,并将其结果添加到此任务结果集合中。完成后,将合并所有任务结果。我将使用它作为默认值,因为if可以避免锁争用。
(2( 在所有任务之外创建一个公共集合。所有任务都写入集合,但写入此集合(添加项(必须与锁同步,因为它不是原子的。如果只有几百个项目要处理,但每个项目的处理时间是一秒或更长,我会使用这个。在这种情况下,锁定时间可以忽略不计。
(3( 在所有任务之外创建一个数组,并为每个任务指定一个区域来写入结果。即使所有任务都写入同一数组,它们写入的内存位置也不同。因此不需要同步。
(3(示例
// Source must be array or IList.
var source = Enumerable.Range(0, 100000).ToArray();
// Partition the entire source array.
var rangePartitioner = Partitioner.Create(0, source.Length);
double[] results = new double[source.Length];
// Loop over the partitions in parallel.
Parallel.ForEach(rangePartitioner, (range, loopState) =>
{
// Loop over each range element without a delegate invocation.
for (int i = range.Item1; i < range.Item2; i++)
{
results[i] = source[i] * Math.PI;
}
});
代码来源:https://learn.microsoft.com/en-us/dotnet/standard/parallel-programming/custom-partitioners-for-plinq-and-tpl#configuring-每个的静态范围分割器