限制正在并行运行的队列的消费者数量

本文关键字:队列 消费者 运行 并行 | 更新日期: 2023-09-27 17:50:42

我正在使用队列,有两个线程。一个用于Enqueue,另一个用于Dequeue。它们分别被称为生产者和消费者。农产品可以是无限的。但我需要限制消费者同时运行。我读了"Task Parallel Library"answers"Parallel. for"。但我不确定我应该如何在这里实现它们。请告诉我。以下是一些代码段,可以帮助您更好地理解

问题
static void Main(string[] args)
{
// The Producer code comes here
// ...
// The Consumer code comes here
Thread consumer = new Thread(new ThreadStart(PendingBookingConsumer));
consumer.Start();
}
private static void PendingBookingConsumer()
{
    try
    {
        while (true)
        {
            if (pendingBookingsQueue != null && pendingBookingsQueue.Count > 0)
            {
                PendingBooking oPendingBooking = pendingBookingsQueue.Dequeue();
                //Run the Console App
                string command = @"C:'ServerAgentConsole.exe";
                string args = oPendingBooking.Id + " " + oPendingBooking.ServiceAccountEmail.Trim() + " " + oPendingBooking.ServiceAccountPassword.Trim() + " " + oPendingBooking.ServiceAccountEmail.Trim()
                    + " " + oPendingBooking.MailBoxOwnerEmail.Trim() + " " + oPendingBooking.Method.Trim();
                Process process = new Process();
                process.StartInfo.FileName = command;
                process.StartInfo.Arguments = args;
                process.EnableRaisingEvents = true;
                process.Exited += (sender, e) =>
                {
                    Process myProcess = (Process)sender;
                    Console.WriteLine("Agent for booking ID :" + myProcess.StartInfo.Arguments[0] + " Done");
                };
                process.Start();
                Thread.Sleep(2);
            }
        }
    }
    catch (Exception ex)
    {
        Console.WriteLine(ex.Message);
    }
}

限制正在并行运行的队列的消费者数量

使用一种常用技术来处理具有固定并行度的BlockingCollection。在Parallel.ForEach选项中指定DOP

然后,让处理函数等待子进程:

process.Start();
process.WaitForExit();

这样在任何时候都有固定数量的未完成子进程

您还可以考虑TPL Dataflow库,它可以轻松实现Producer/Consumer模式:

private static BufferBlock<int> m_buffer = new BufferBlock<int>>(
    new DataflowBlockOptions { BoundedCapacity = 10, MaxDegreeOfParallelism = 4 });
// Producer
private static async void Producer()
{
    while(true)
    {
        await m_buffer.SendAsync(Produce());
    }
}
// Consumer
private static async Task Consumer()
{
    while(true)
    {
        Process(await m_buffer.ReceiveAsync());
    }
}

您可以看到BoundedCapacity用于限制队列大小的节流技术,MaxDegreeOfParallelism用于限制并行消耗任务。

您可以从MSDN下载TPL Dataflow的介绍。

PS:如何:在MSDN上实现生产者-消费者数据流模式

BlockingCollection支持上限

BlockingCollection Constructor (Int32)

正如Hans所说,这只是集合的大小。
也许你可以在消费者中使用并行。