并行Linq -返回返回的第一个结果
本文关键字:返回 第一个 结果 并行 Linq | 更新日期: 2023-09-27 18:14:08
我使用PLINQ来运行一个测试串行端口以确定它们是否是GPS设备的函数。
一些串口立即被发现是一个有效的GPS。在本例中,我希望返回完成测试的第一个对象。我不想等剩下的结果。
我可以用PLINQ做到这一点吗?还是我必须安排一批任务并等待一个返回?
PLINQ在这里可能还不够。虽然您可以使用.First
,但在。net 4中,这将导致它按顺序运行,这违背了目的。(请注意,这将在。net 4.5中得到改进。)
Task<Location>
,然后使用Task。WaitAny等待第一个成功的操作。
这提供了一种简单的方法来安排一堆"任务",然后只使用第一个结果。
在过去的几天里,我一直在断断续续地思考这个问题,我找不到一个内置的PLINQ方法来在c# 4.0中做到这一点。对于这个使用FirstOrDefault的问题,公认的答案在完整的PLINQ查询完成之前不返回值,并且仍然返回(有序的)第一个结果。下面的极端示例展示了这种行为:
var cts = new CancellationTokenSource();
var rnd = new ThreadLocal<Random>(() => new Random());
var q = Enumerable.Range(0, 11).Select(x => x).AsParallel()
.WithCancellation(cts.Token).WithMergeOptions( ParallelMergeOptions.NotBuffered).WithDegreeOfParallelism(10).AsUnordered()
.Where(i => i % 2 == 0 )
.Select( i =>
{
if( i == 0 )
Thread.Sleep(3000);
else
Thread.Sleep(rnd.Value.Next(50, 100));
return string.Format("dat {0}", i).Dump();
});
cts.CancelAfter(5000);
// waits until all results are in, then returns first
q.FirstOrDefault().Dump("result");
我没有看到立即获得第一个可用结果的内置方法,但我能够提出两个变通方法。
第一个创建Task来完成工作并返回Task,从而快速完成PLINQ查询。可以将生成的任务传递给WaitAny,以便在第一个结果可用时立即获得它:
var cts = new CancellationTokenSource();
var rnd = new ThreadLocal<Random>(() => new Random());
var q = Enumerable.Range(0, 11).Select(x => x).AsParallel()
.WithCancellation(cts.Token).WithMergeOptions( ParallelMergeOptions.NotBuffered).WithDegreeOfParallelism(10).AsUnordered()
.Where(i => i % 2 == 0 )
.Select( i =>
{
return Task.Factory.StartNew(() =>
{
if( i == 0 )
Thread.Sleep(3000);
else
Thread.Sleep(rnd.Value.Next(50, 100));
return string.Format("dat {0}", i).Dump();
});
});
cts.CancelAfter(5000);
// returns as soon as the tasks are created
var ts = q.ToArray();
// wait till the first task finishes
var idx = Task.WaitAny( ts );
ts[idx].Result.Dump("res");
这可能是一种糟糕的方法。因为PLINQ查询的实际工作只是一个非常快的Task.Factory。StartNew,使用PLINQ根本没有意义。在IEnumerable上一个简单的 .Select( i => Task.Factory.StartNew( ...
更干净,可能更快。
第二种解决方法使用队列(BlockingCollection),并在计算结果后将结果插入该队列:
var cts = new CancellationTokenSource();
var rnd = new ThreadLocal<Random>(() => new Random());
var q = Enumerable.Range(0, 11).Select(x => x).AsParallel()
.WithCancellation(cts.Token).WithMergeOptions( ParallelMergeOptions.NotBuffered).WithDegreeOfParallelism(10).AsUnordered()
.Where(i => i % 2 == 0 )
.Select( i =>
{
if( i == 0 )
Thread.Sleep(3000);
else
Thread.Sleep(rnd.Value.Next(50, 100));
return string.Format("dat {0}", i).Dump();
});
cts.CancelAfter(5000);
var qu = new BlockingCollection<string>();
// ForAll blocks until PLINQ query is complete
Task.Factory.StartNew(() => q.ForAll( x => qu.Add(x) ));
// get first result asap
qu.Take().Dump("result");
使用此方法,工作是使用PLINQ完成的,并且blockingcollection的Take()将在PLINQ查询插入后立即返回第一个结果。
虽然这会产生期望的结果,但我不确定它比使用更简单的Tasks + WaitAny
经过进一步审查,显然可以使用FirstOrDefault
来解决这个问题。PLINQ默认情况下不保留排序,如果查询没有缓存,将立即返回。
要在。net 4.0中完全使用PLINQ完成此任务:
SerialPorts. // Your IEnumerable of serial ports
AsParallel().AsUnordered(). // Run as an unordered parallel query
Where(IsGps). // Matching the predicate IsGps (Func<SerialPort, bool>)
Take(1). // Taking the first match
FirstOrDefault(); // And unwrap it from the IEnumerable (or null if none are found
关键是不要使用像First或FirstOrDefault这样的有序求值,除非您指定您只关心查找一个。