在IEnumerable实现中使用任务并行库来提高速度

本文关键字:并行 高速度 任务 IEnumerable 实现 | 更新日期: 2023-09-27 18:21:29

下面的代码是我试图优化的代码的简化版本。

void Main()
{
    var words = new List<string> {"abcd", "wxyz", "1234"};
    foreach (var character in SplitItOut(words))
    {
        Console.WriteLine (character);
    }
}
public IEnumerable<char> SplitItOut(IEnumerable<string> words)
{
    foreach (string word in words)
    {
        var characters = GetCharacters(word);
        foreach (char c in characters)
        {
            yield return c;
        }
    }
}
char[] GetCharacters(string word)
{
    Thread.Sleep(5000);
    return word.ToCharArray();
}

我无法更改方法SplitIt Out的签名。GetCharacters方法调用成本很高,但对线程是安全的。SplitItOut方法的输入可以包含100000多个条目,对GetCharacters()方法的单个调用可能需要大约200ms。它也可以抛出我可以忽略的异常。结果的顺序无关紧要。

在我的第一次尝试中,我使用TPL提出了以下实现,它大大加快了速度,但在处理完所有单词之前一直处于阻塞状态。

public IEnumerable<char> SplitItOut(IEnumerable<string> words)
{
    Task<char[][]> tasks = Task<char[][]>.Factory.StartNew(() =>
    {
        ConcurrentBag<char[]> taskResults = new ConcurrentBag<char[]>();
        Parallel.ForEach(words,
            word => 
            {
                taskResults.Add(GetCharacters(word));
            });
        return taskResults.ToArray();
    });
    foreach (var wordResult in tasks.Result)
    {
        foreach (var c in wordResult)
        {
            yield return c;
        }
    }
}

我正在寻找比这更好的方法SplitIt()的实现。降低处理时间是我的首要任务。

在IEnumerable实现中使用任务并行库来提高速度

如果我没看错你的问题,你并不是想加快从单词中创建字符的并行处理——你希望你的枚举表在准备好后立即生成每个。对于您目前拥有的实现(以及我目前看到的其他答案),SplitItOut将等待,直到所有单词都已发送到GetCharacters,并且在生成第一个单词之前返回所有结果。

在这种情况下,我喜欢把事情看作是把我的过程分成生产者和消费者。您的生产者线程将获取可用单词并调用GetCharacters,然后将结果转储到某个地方。使用者一旦准备好,就会向SplitItOut的调用方提供字符。实际上,消费者是SplitItOut的调用者。

我们可以使用BlockingCollection作为生成字符的一种方式,也可以作为放置结果的"地方"。我们可以使用ConcurrentBag作为放置尚未拆分的单词的位置:

static void Main()
        {
            var words = new List<string> { "abcd", "wxyz", "1234"};
            foreach (var character in SplitItOut(words))
            {
                Console.WriteLine(character);
            }
        }

        static char[] GetCharacters(string word)
        {
            Thread.Sleep(5000);
            return word.ToCharArray();
        }

没有更改您的mainGetCharacters-因为这些代表了您的限制(不能更改调用者,不能更改昂贵的操作)

        public static IEnumerable<char> SplitItOut(IEnumerable<string> words)
        {
            var source = new ConcurrentBag<string>(words);
            var chars = new BlockingCollection<char>();
            var tasks = new[]
                   {
                       Task.Factory.StartNew(() => CharProducer(source, chars)),
                       Task.Factory.StartNew(() => CharProducer(source, chars)),
                       //add more, tweak away, or use a factory to create tasks.
                       //measure before you simply add more!
                   };
            Task.Factory.ContinueWhenAll(tasks, t => chars.CompleteAdding());
            return chars.GetConsumingEnumerable();
        }

在这里,我们将SplitItOut方法更改为做四件事:

  1. 用我们希望拆分的所有单词初始化concurrentbag。(附带说明:如果你想按需枚举单词,你可以启动一个新任务来推送它们,而不是在构造函数中执行)
  2. 启动我们的字符"生产者"任务。你可以开始一个设定的数字,使用一个工厂,无论什么。我建议在测量之前不要疯狂地完成任务
  3. 当所有任务都完成时,向BlockingCollection发出我们已经完成的信号
  4. "消耗"所有生成的字符(我们自己做得很简单,只返回一个IEnumerable<char>,而不是foreach和yield,但如果你愿意,你可以做很长的路)

缺少的只是我们的生产者实现。我已经扩展了所有linq快捷方式以使其清晰,但它非常简单:

        private static void CharProducer(ConcurrentBag<string> words, BlockingCollection<char> output)
        {
            while(!words.IsEmpty)
            {
                string word;
                if(words.TryTake(out word))
                {
                    foreach (var c in GetCharacters(word))
                    {
                        output.Add(c);
                    }
                }
            }
        }

这只是

  1. 从ConcurrentBag中取出一个单词(除非它是空的——如果是空的,任务就完成了!)
  2. 调用昂贵的方法
  3. 将输出放入BlockingCollection

我将您的代码通过Visual Studio中内置的探查器,看起来Task的开销正在伤害您。我对它进行了轻微的重构,删除了Task,它稍微提高了性能。如果没有实际的算法和数据集,就很难准确地说出问题是什么,或者在哪里可以提高性能。如果你有VS Premium或Ultimate,有内置的评测工具可以帮你很多忙。你也可以参加ANTS的试用。

需要记住的一点是:不要过早地进行优化。如果您的代码执行得可以接受,那么不要向添加内容,可能会以牺牲可读性和可维护性为代价,使其更快。如果它的性能没有达到可接受的水平,在你开始摆弄它之前先对它进行评测

无论如何,这是我对你的算法的重构:

    public static IEnumerable<char> SplitItOut(IEnumerable<string> words)
    {
        var taskResults = new ConcurrentBag<char[]>();
        Parallel.ForEach(words, word => taskResults.Add(GetCharacters(word)));
        return taskResults.SelectMany(wordResult => wordResult);
    }