具有并发调度程序的异步生产者-消费者应用程序

本文关键字:生产者 消费者 应用程序 异步 并发 调度程序 | 更新日期: 2023-09-27 18:25:38

我使用BufferBlock实现了一个生产者-消费者。代码运行良好。

static async Task Produce(ITargetBlock<int> queue)
{
    try
    {
        // Post messages to the block asynchronously. 
        for (int i = 0; i < 100; i++)
        {
            Console.WriteLine("Sending: {0}", i);
            await queue.SendAsync(i);
        }
    }
    finally 
    {
        queue.Complete();
    }
}
static async Task Consume(ISourceBlock<int> queue)
{
    // Read messages from the block asynchronously. 
    while (await queue.OutputAvailableAsync())
    {
        int value = await queue.ReceiveAsync();
        Console.WriteLine("Receiving: {0}", value);
    }
}
static void Main(string[] args)
{
    // Create a BufferBlock<int> object. 
    var queue = new BufferBlock<int>();
    try
    {
        var produce = Produce(queue);
        var consume = Consume(queue);
        Task.WaitAll(produce, consume, queue.Completion);
    }
    catch (Exception exception)
    {
        Console.WriteLine("An exception was thrown: {0}", exception.Message);
        Console.WriteLine("Terminating...");
    }
}

现在我有一个节流问题,那就是我希望消费者的最大并发数是4。我想使用SemaphoreSlim机器人不知道如何应用它。

注意:这是一个并发调度程序问题,而不是并行性问题。

具有并发调度程序的异步生产者-消费者应用程序

如果您只想一次消耗一定的量,您只需多次调用TryRecieve,直到它变空或达到该量。这里有一个处理这个问题的扩展方法:

public static bool TryReceive<T>(this BufferBlock<T> bufferBlock, int count, out IList<T> items)
{
    items = new List<T>();   
    for (var i = 0; i < count; i++)
    {
        T item;
        if (bufferBlock.TryReceive(out item))
        {
            items.Add(item);
        }
        else
        {
            break;
        }
    }
    return items.Any();
}

因此,消费者变成:

static async Task Consume(BufferBlock<int> queue)
{
    // Read messages from the block asynchronously. 
    while (await queue.OutputAvailableAsync())
    {
        IList<int> values;
        queue.TryReceive(4, out values);
        Console.WriteLine("Receiving: {0}", string.Join(", ", values));
    }
}