平行的.重I/O操作中的ForEach与异步Forloop

本文关键字:ForEach 异步 Forloop 操作 | 更新日期: 2023-09-27 18:03:27

我想比较两种理论情景。为了解决这个问题,我简化了这些例子。但基本上这是典型的生产者和消费者场景。(我关注的是消费者)。

我有一个大的Queue<string> dataQueue,我必须传输到多个客户端。

让我们从简单的例子开始:

 class SequentialBlockingCase
 {
    public static Queue<string> DataQueue = new Queue<string>();
    private static List<string> _destinations = new List<string>();
    /// <summary>
    /// Is the main function that is run in its own thread
    /// </summary>
    private static void Run()
    {
        while (true)
        {
            if (DataQueue.Count > 0)
            {
                string data = DataQueue.Dequeue();
                foreach (var destination in _destinations)
                {
                    SendDataToDestination(destination, data);
                }
            }
            else
            {
                Thread.Sleep(1);
            }
        }
    }
    private static void SendDataToDestination(string destination, string data)
    {
        //TODO: Send data using http post, instead simulate the send
        Thread.Sleep(200);
    }
}
}

现在这个设置工作得很好。它坐在那里轮询Queue,当有数据要发送时,它将其发送到所有目的地。

问题:

  • 如果其中一个目的地不可用或缓慢,它会影响所有其他目的地。
  • 在并行执行的情况下不使用多线程
  • 阻塞到每个目的地的每次传输。

这是我的第二次尝试:

 class ParalleBlockingCase
{
    public static Queue<string> DataQueue = new Queue<string>();
    private static List<string> _destinations = new List<string>();
    /// <summary>
    /// Is the main function that is run in its own thread
    /// </summary>
    private static void Run()
    {
        while (true)
        {
            if (DataQueue.Count > 0)
            {
                string data = DataQueue.Dequeue();
                Parallel.ForEach(_destinations, destination =>
                {
                    SendDataToDestination(destination, data);
                });
            }
            else
            {
                Thread.Sleep(1);
            }
        }
    }
    private static void SendDataToDestination(string destination, string data)
    {
        //TODO: Send data using http post
        Thread.Sleep(200);
    }
}

如果一个目的地速度慢或不可用,此修订至少不会影响其他目的地。

然而,这种方法仍然阻塞,我不确定Parallel.ForEach是否使用线程池。我的理解是,它将创建X数量的线程/任务,并一次执行4(4核cpu)。但是它必须在任务5开始之前完成任务1。

因此我的第三个选择:

class ParalleAsyncCase
{
    public static Queue<string> DataQueue = new Queue<string>();
    private static List<string> _destinations = new List<string> { };
    /// <summary>
    /// Is the main function that is run in its own thread
    /// </summary>
    private static void Run()
    {
        while (true)
        {
            if (DataQueue.Count > 0)
            {
                string data = DataQueue.Dequeue();
                List<Task> tasks = new List<Task>();
                foreach (var destination in _destinations)
                {
                    var task = SendDataToDestination(destination, data);
                    task.Start();
                    tasks.Add(task);
                }
                //Wait for all tasks to complete
                Task.WaitAll(tasks.ToArray());
            }
            else
            {
                Thread.Sleep(1);
            }
        }
    }
    private static async Task SendDataToDestination(string destination, string data)
    {
        //TODO: Send data using http post
        await Task.Delay(200);
    }
}

现在从我对这个选项的理解来看,仍然会阻塞在Task.WaitAll(tasks.ToArray());的主线程上,这很好,因为我不想让它跑掉创建任务的速度比它们可以执行的速度快。

但是并行执行的任务应该使用ThreadPool,并且所有X个数的任务应该同时开始执行,而不是阻塞或按顺序执行。(线程池将在它们变为活动状态或awaiting时在它们之间交换)

现在是我的问题。

选项3是否比选项2有性能优势?

特别是在性能更高的服务器端场景中。在我正在开发的具体软件中。上面的简单用例可能有多个实例。有几个消费者。

我对这两种解决方案的理论差异和优缺点很感兴趣,如果有的话,甚至可能有更好的第四个选择。

平行的.重I/O操作中的ForEach与异步Forloop

Parallel.ForEach将使用线程池。异步代码将不使用,因为它根本不需要任何线程(链接到我的博客)。

正如Mrinal指出的,如果你有cpu限制的代码,并行是合适的;如果您有I/o绑定的代码,那么异步是合适的。在这种情况下,HTTP POST显然是I/O,因此理想的消费代码应该是异步的。

如果有的话,也许还有更好的第四个选择。

我建议让你的消费者完全异步。为了做到这一点,你需要使用一个异步兼容的生产者/消费者队列。有一个相当先进的(BufferBlock<T>)在TPL数据流库,和一个相当简单的(AsyncProducerConsumerQueue<T>)在我的AsyncEx库。

使用它们中的任何一个,您都可以创建一个完全异步的消费者:
List<Task> tasks = new List<Task>();
foreach (var destination in _destinations)
{
  var task = SendDataToDestination(destination, data);
  tasks.Add(task);
}
await Task.WhenAll(tasks);

或者更简单的:

var tasks = _destinations
    .Select(destination => SendDataToDestination(destination, data));
await Task.WhenAll(tasks);

你的主要问题-平行。ForEach vs Async Forloop

  • 对于computing operations,在内存处理中总是使用从线程池中调用的Parallel API作为线程来做一些工作,这就是其调用的目的。
  • 对于IO bound operations,总是Async-Await,因为没有线程调用,它使用IO completion ports的硬件能力在后台处理。

由于Async-Await是首选选项,让我指出一些事情在你的实现:

  • 它是Synchronous,因为你没有等待Send data using http post的主要操作,正确的代码应该是await Http Post Async而不是await Task.Delay
  • 如果你正在调用标准Async实现,如Http post Async,你不需要显式启动Task,这只是如果你有自定义Async方法的情况
  • Task.WaitAll将只适用于控制台应用程序,它没有同步上下文或UI线程,否则会导致死锁,你需要使用Task.WhenAll

现在关于Parallel approach

  • 虽然代码是正确的,Parallel API确实在Thread pool上工作,并且大多数情况下它能够重用线程,从而优化,但如果任务长时间运行,它可能最终会创建多个线程,以限制您可以使用构造器选项new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount },从而限制最大数量为系统中的逻辑核心数

对于IO绑定调用来说,Parallel API是一个坏主意的另一个重要原因是,因为每个线程对于UI来说都是一个昂贵的资源,包括Thread environment block + User memory + Kernel Memory的创建,并且在IO操作中它闲置着不做任何事情,这无论如何都不好