批处理程序(从队列中聚合项目)

本文关键字:项目 队列 程序 批处理 | 更新日期: 2023-09-27 18:03:38

我有一个每3秒运行一次的System.Timers.Timer
一旦过期,我想取集合中的所有项,并在一个批次中处理它们。

这样做的动机是为了减少后端系统上的I/o数量。

挑战在于我有多个并发线程附加到集合/队列。正因为如此,我想使用ConcurrentQueue<T> -但这是一个糟糕的选择。

这篇关于社交msdn的文章很好地描述了这里的问题。

我需要的是一个集合/队列,我可以一次获得所有数据(ToArray()),并在一个原子操作中清除队列,这样我就不会丢失其他线程同时写入集合/队列的任何数据。

 private static void T1_Elapsed(object sender, ElapsedEventArgs e)
 {
    string[] result = _queue.ToArray();
   _queue = new ConcurrentQueue<string>(); // strings will be lost :-)
 }

我倾向于在简单的Queue<T>上使用简单的基于锁的方法。

 private static readonly object _myLock = new object();
 private static void T1_Elapsed(object sender, ElapsedEventArgs e)
 {
     string[] result;
     lock (_myLock)
     {
         result = _queue.ToArray();
         _queue.Clear();
     }
 }

现在这段代码有一个明显的缺陷,可以在生产者代码中看到:

private static void ProduceItems()
{
    //while (!_stop)
    for(int i=0; i<int.MaxValue; i++)
    {
        if (_stop) break;
        lock (_myLock) // bad. locks out other producers running on other threads.
        {
            Console.WriteLine("Enqueue " + i);
            _queue.Enqueue("string" + i);
        }
        Thread.Sleep(1000); // FOR DEBUGGING PURPOSES ONLY
    }
}

当然,这段代码将锁定任何其他试图添加到队列的生产者。如果设置了"T1_Elapsed"锁,是否有任何方法可以只验证生产者中的锁?

有什么更适合我的问题吗?也许有什么可观察到的?或者有没有好的"批处理/聚合"的例子?

UPDATE 1: RX
你可以用RX做的事情真是太棒了:)
我仍在研究如何在这种情况下处理错误、重试或重新排队。

internal class Rx
{
    internal static void Start()
    {
        ISubject<int> subject = new Subject<int>();
        ISubject<int> syncedSubject = Subject.Synchronize(subject); // that should do it? - UNTESTED!
        var subscription = syncedSubject.Buffer(TimeSpan.FromSeconds(5), 10)
            .Subscribe((item) => ProcessBatch(item));
        for (int i=1; i<int.MaxValue; i++)
        {
            syncedSubject.OnNext(i);
            Thread.Sleep(200);
            Console.WriteLine($"Produced {i}.");
        }
        Console.ReadKey();
        subscription.Dispose();
    }
    private static void ProcessBatch(IList<int> list)
    {
        // Aggregate many into one
        string joined = string.Join(" ", list);
        // Process one
        Console.WriteLine($"Wrote {joined} to remote storage.");
        // how do you account for errors here?
        myProducer.ReEnqueueMyFailedItems(list); // ?
    }
}

批处理程序(从队列中聚合项目)

TPL DataFlow

我会说给TPL DataFlow库一个尝试。它建立在任务并行库的基础上,专为这些并发性扮演重要角色的需求而设计。有关这个库的一系列博客文章,请参阅http://blog.stephencleary.com/2012/09/introduction-to-dataflow-part-1.html。

BatchBlock似乎很适合您的场景。参见https://msdn.microsoft.com/en-us/library/hh228602(v=vs.110).aspx获取教程。

另一个使用BatchBlock的例子:https://taskmatics.com/blog/simplifying-producer-consumer-processing-with-tpl-dataflow-structures/

不是将数据发布到队列,而是将数据发布到可用的TPL数据流块之一。

另一个选项可以使用

<<p> 活性扩展/strong>

见http://www.introtorx.com/uat/content/v1.0.10621.0/01_WhyRx.html,有很好的介绍

它还提供批处理支持:

void Sample()
{
    var dataprovider = new Subject<int>();
    var subscription = dataprovider
        .Buffer(TimeSpan.FromMinutes(3))
        .Subscribe(listOfNumbers => 
        {
            // do something with batch of items
            var batchSize = listOfNumbers.Count;
        });
    for(int i = 0; i <= 5; ++i)
    {
        dataprovider.OnNext(i);
    }
    subscription.Dispose();
}

在上面的例子中,你需要一些修改来允许来自不同线程的多个生产者添加数据,参见响应式扩展OnNext。这是一段简化的代码(!),但它让你对使用RX的概念有了一个大致的了解。

缓冲可以使用最大缓冲区大小、给定时间段或两者的组合来完成。所以它也可以代替你的计时器。

Subject上调用OnNext而不是将项目添加到队列

TPL DataFlow和RX都消除了需要清除的队列或类似的东西的使用,因此它将使您摆脱这种痛苦。