带有批处理生产者的生产者/消费者模式

本文关键字:生产者 消费者 模式 批处理 | 更新日期: 2023-09-27 18:10:34

我正在尝试实现一个相当简单的生产者/消费者风格的应用程序,有多个生产者和一个消费者。

研究让我想到了BlockingCollection<T>,它很有用,允许我实现一个长时间运行的消费者任务,如下所示:

var c1 = Task.Factory.StartNew(() =>
{
    var buffer = new List<int>(BATCH_BUFFER_SIZE);
    foreach (var value in blockingCollection.GetConsumingEnumerable())
    {
        buffer.Add(value);
        if (buffer.Count == BATCH_BUFFER_SIZE)
        {
            ProcessItems(buffer);
            buffer.Clear();
        }
    }
});

ProcessItems函数将缓冲区提交给数据库,并且它批量工作。然而,这个解决方案不是最优的。在大多数生产阶段,可能需要一段时间缓冲区才会被填满,这意味着数据库已经过期。

更理想的解决方案是在30秒计时器上运行任务或超时短路foreach

我带着计时器的想法,想出了这个:

syncTimer = new Timer(new TimerCallback(TimerElapsed), blockingCollection, 5000, 5000);
private static void TimerElapsed(object state)
{
    var buffer = new List<int>();
    var collection = ((BlockingCollection<int>)state).GetConsumingEnumerable();
    foreach (var value in collection)
    {
        buffer.Add(value);
    }
    ProcessItems(buffer);
    buffer.Clear();
}

这有一个明显的问题,foreach将被阻塞,直到最后,击败定时器的目的。

有谁能提供一个方向吗?我基本上需要定期快照BlockingCollection,并处理内容清除它。也许BlockingCollection是错误的类型?

带有批处理生产者的生产者/消费者模式

在计时器回调中不使用GetConsumingEnumerable,而是使用以下方法之一,将结果添加到列表中,直到它返回false或达到满意的批处理大小。

BlockingCollection。TryTake Method (T) -可能是你需要的,你根本不想执行进一步的等待。

BlockingCollection。TryTake Method (T, Int32)

BlockingCollection。TryTake Method (T, TimeSpan)

您可以轻松地将其提取到扩展中(未经测试):

public static IList<T> Flush<T>
(this BlockingCollection<T> collection, int maxSize = int.MaxValue)
{
     // Argument checking.
     T next;
     var result = new List<T>();
     while(result.Count < maxSize && collection.TryTake(out next))
     {
         result.Add(next);
     }
     return result;
}