Parallel.Foreach + yield return?

本文关键字:return yield Foreach Parallel | 更新日期: 2023-09-27 18:17:06

我想用这样的并行循环处理一些东西:

public void FillLogs(IEnumerable<IComputer> computers)
{
    Parallel.ForEach(computers, cpt=>
    {
        cpt.Logs = cpt.GetRawLogs().ToList();
    });
}

好的,它工作得很好。但是如何做,如果我想FillLogs方法返回一个IEnumerable ?

public IEnumerable<IComputer> FillLogs(IEnumerable<IComputer> computers)
{
    Parallel.ForEach(computers, cpt=>
    {
        cpt.Logs = cpt.GetRawLogs().ToList();
        yield return cpt // KO, don't work
    });
}

编辑

似乎不可能……但我用的是这样的:

public IEnumerable<IComputer> FillLogs(IEnumerable<IComputer> computers)
{
    return computers.AsParallel().Select(cpt => cpt);
}

但是我放cpt.Logs = cpt.GetRawLogs().ToList();指令的地方

Parallel.Foreach + yield return?

简短版-不,这是不可能通过迭代器块;较长的版本可能涉及调用者的迭代器线程(执行dequeue)和并行工作线程(执行enqueue)之间的同步队列/脱队列;但是作为旁注-日志通常是io绑定的,并且并行化io绑定的东西通常不是很好。

如果调用者要花一些时间来消耗每个日志,那么一次只处理一个日志的方法可能有一些优点,但是可以在调用者正在消耗前一个日志时执行;即yield之前开始下一项Task ,并在yield之后等待完成…但这还是很复杂。作为一个简化的例子:

static void Main()
{
    foreach(string s in Get())
    {
        Console.WriteLine(s);
    }
}
static IEnumerable<string> Get() {
    var source = new[] {1, 2, 3, 4, 5};
    Task<string> outstandingItem = null;
    Func<object, string> transform = x => ProcessItem((int) x);
    foreach(var item in source)
    {
        var tmp = outstandingItem;
        // note: passed in as "state", not captured, so not a foreach/capture bug
        outstandingItem = new Task<string>(transform, item);
        outstandingItem.Start();
        if (tmp != null) yield return tmp.Result;
    }
    if (outstandingItem != null) yield return outstandingItem.Result;
}
static string ProcessItem(int i)
{
    return i.ToString();
}

我不想冒犯你,但也许你们缺乏理解。Parallel.ForEach意味着TPL将根据几个线程中的可用硬件运行foreach。但这意味着,这两项工作可以并行进行!yield return使您有机会从列表(或其他什么)中获得一些值,并在需要时逐一返回它们。它避免了首先查找符合条件的所有项,然后遍历它们的需要。这确实是一个性能优势,但不能并行完成。

虽然这个问题很老,但我已经设法做了一些有趣的事情。

class Program
{
    static void Main(string[] args)
    {
        foreach (var message in GetMessages())
        {
            Console.WriteLine(message);
        }
    }

    // Parallel yield
    private static IEnumerable<string> GetMessages()
    {
        int total = 0;
        bool completed = false;
        var batches = Enumerable.Range(1, 100).Select(i => new Computer() { Id = i });
        var qu = new ConcurrentQueue<Computer>();
        Task.Run(() =>
        {
            try
            {
                Parallel.ForEach(batches,
                    () => 0,
                    (item, loop, subtotal) =>
                    {
                        Thread.Sleep(1000);
                        qu.Enqueue(item);
                        return subtotal + 1;
                    },
                    result => Interlocked.Add(ref total, result));
            }
            finally
            {
                completed = true;
            }
        });
        int current = 0;
        while (current < total || !completed)
        {
            SpinWait.SpinUntil(() => current < total || completed);
            if (current == total) yield break;
            current++;
            qu.TryDequeue(out Computer computer);
            yield return $"Completed {computer.Id}";
        }
    }
}
public class Computer
{
    public int Id { get; set; }
}

与Koray的答案相比,这个确实使用了所有的CPU内核。

您可以使用以下扩展方法

public static class ParallelExtensions
{
    public static IEnumerable<T1> OrderedParallel<T, T1>(this IEnumerable<T> list, Func<T, T1> action)
    {
        var unorderedResult = new ConcurrentBag<(long, T1)>();
        Parallel.ForEach(list, (o, state, i) =>
        {
            unorderedResult.Add((i, action.Invoke(o)));
        });
        var ordered = unorderedResult.OrderBy(o => o.Item1);
        return ordered.Select(o => o.Item2);
    }
}
使用

:

public void FillLogs(IEnumerable<IComputer> computers)
{
    cpt.Logs = computers.OrderedParallel(o => o.GetRawLogs()).ToList();
}

            Queue<string> qu = new Queue<string>();
            bool finished = false;
            Task.Factory.StartNew(() =>
            {
                Parallel.ForEach(get_list(), (item) =>
                {
                    string itemToReturn = heavyWorkOnItem(item);         
                    lock (qu)
                       qu.Enqueue(itemToReturn );                        
                });
                finished = true;
            });
            while (!finished)
            {
                lock (qu)
                    while (qu.Count > 0)
                        yield return qu.Dequeue();
                //maybe a thread sleep here?
            }
编辑:

我觉得这样比较好:

        public static IEnumerable<TOutput> ParallelYieldReturn<TSource, TOutput>(this IEnumerable<TSource> source, Func<TSource, TOutput> func)
        {
            ConcurrentQueue<TOutput> qu = new ConcurrentQueue<TOutput>();
            bool finished = false;
            AutoResetEvent re = new AutoResetEvent(false);
            Task.Factory.StartNew(() =>
            {
                Parallel.ForEach(source, (item) =>
                {
                    qu.Enqueue(func(item));
                    re.Set();
                });
                finished = true;
                re.Set();
            });
            while (!finished)
            {
                re.WaitOne();
                while (qu.Count > 0)
                {
                    TOutput res;
                    if (qu.TryDequeue(out res))
                        yield return res;
                }
            }
        }   

Edit2:我同意简短的No答案。这个代码是无用的;你不能打破yield循环