Parallel.Foreach + yield return?
本文关键字:return yield Foreach Parallel | 更新日期: 2023-09-27 18:17:06
我想用这样的并行循环处理一些东西:
public void FillLogs(IEnumerable<IComputer> computers)
{
Parallel.ForEach(computers, cpt=>
{
cpt.Logs = cpt.GetRawLogs().ToList();
});
}
好的,它工作得很好。但是如何做,如果我想FillLogs方法返回一个IEnumerable ?
public IEnumerable<IComputer> FillLogs(IEnumerable<IComputer> computers)
{
Parallel.ForEach(computers, cpt=>
{
cpt.Logs = cpt.GetRawLogs().ToList();
yield return cpt // KO, don't work
});
}
编辑
似乎不可能……但我用的是这样的:
public IEnumerable<IComputer> FillLogs(IEnumerable<IComputer> computers)
{
return computers.AsParallel().Select(cpt => cpt);
}
但是我放cpt.Logs = cpt.GetRawLogs().ToList();
指令的地方
简短版-不,这是不可能通过迭代器块;较长的版本可能涉及调用者的迭代器线程(执行dequeue)和并行工作线程(执行enqueue)之间的同步队列/脱队列;但是作为旁注-日志通常是io绑定的,并且并行化io绑定的东西通常不是很好。
如果调用者要花一些时间来消耗每个日志,那么一次只处理一个日志的方法可能有一些优点,但是可以在调用者正在消耗前一个日志时执行;即在yield
之前开始下一项的Task
,并在yield
之后等待完成…但这还是很复杂。作为一个简化的例子:
static void Main()
{
foreach(string s in Get())
{
Console.WriteLine(s);
}
}
static IEnumerable<string> Get() {
var source = new[] {1, 2, 3, 4, 5};
Task<string> outstandingItem = null;
Func<object, string> transform = x => ProcessItem((int) x);
foreach(var item in source)
{
var tmp = outstandingItem;
// note: passed in as "state", not captured, so not a foreach/capture bug
outstandingItem = new Task<string>(transform, item);
outstandingItem.Start();
if (tmp != null) yield return tmp.Result;
}
if (outstandingItem != null) yield return outstandingItem.Result;
}
static string ProcessItem(int i)
{
return i.ToString();
}
我不想冒犯你,但也许你们缺乏理解。Parallel.ForEach
意味着TPL将根据几个线程中的可用硬件运行foreach。但这意味着,这两项工作可以并行进行!yield return
使您有机会从列表(或其他什么)中获得一些值,并在需要时逐一返回它们。它避免了首先查找符合条件的所有项,然后遍历它们的需要。这确实是一个性能优势,但不能并行完成。
虽然这个问题很老,但我已经设法做了一些有趣的事情。
class Program
{
static void Main(string[] args)
{
foreach (var message in GetMessages())
{
Console.WriteLine(message);
}
}
// Parallel yield
private static IEnumerable<string> GetMessages()
{
int total = 0;
bool completed = false;
var batches = Enumerable.Range(1, 100).Select(i => new Computer() { Id = i });
var qu = new ConcurrentQueue<Computer>();
Task.Run(() =>
{
try
{
Parallel.ForEach(batches,
() => 0,
(item, loop, subtotal) =>
{
Thread.Sleep(1000);
qu.Enqueue(item);
return subtotal + 1;
},
result => Interlocked.Add(ref total, result));
}
finally
{
completed = true;
}
});
int current = 0;
while (current < total || !completed)
{
SpinWait.SpinUntil(() => current < total || completed);
if (current == total) yield break;
current++;
qu.TryDequeue(out Computer computer);
yield return $"Completed {computer.Id}";
}
}
}
public class Computer
{
public int Id { get; set; }
}
与Koray的答案相比,这个确实使用了所有的CPU内核。
您可以使用以下扩展方法
public static class ParallelExtensions
{
public static IEnumerable<T1> OrderedParallel<T, T1>(this IEnumerable<T> list, Func<T, T1> action)
{
var unorderedResult = new ConcurrentBag<(long, T1)>();
Parallel.ForEach(list, (o, state, i) =>
{
unorderedResult.Add((i, action.Invoke(o)));
});
var ordered = unorderedResult.OrderBy(o => o.Item1);
return ordered.Select(o => o.Item2);
}
}
使用:
public void FillLogs(IEnumerable<IComputer> computers)
{
cpt.Logs = computers.OrderedParallel(o => o.GetRawLogs()).ToList();
}
Queue<string> qu = new Queue<string>();
bool finished = false;
Task.Factory.StartNew(() =>
{
Parallel.ForEach(get_list(), (item) =>
{
string itemToReturn = heavyWorkOnItem(item);
lock (qu)
qu.Enqueue(itemToReturn );
});
finished = true;
});
while (!finished)
{
lock (qu)
while (qu.Count > 0)
yield return qu.Dequeue();
//maybe a thread sleep here?
}
编辑:我觉得这样比较好:
public static IEnumerable<TOutput> ParallelYieldReturn<TSource, TOutput>(this IEnumerable<TSource> source, Func<TSource, TOutput> func)
{
ConcurrentQueue<TOutput> qu = new ConcurrentQueue<TOutput>();
bool finished = false;
AutoResetEvent re = new AutoResetEvent(false);
Task.Factory.StartNew(() =>
{
Parallel.ForEach(source, (item) =>
{
qu.Enqueue(func(item));
re.Set();
});
finished = true;
re.Set();
});
while (!finished)
{
re.WaitOne();
while (qu.Count > 0)
{
TOutput res;
if (qu.TryDequeue(out res))
yield return res;
}
}
}
Edit2:我同意简短的No答案。这个代码是无用的;你不能打破yield循环