TPL架构问题

本文关键字:问题 TPL | 更新日期: 2023-09-27 17:59:03

我目前正在做一个项目,在这个项目中,我们面临着并行处理项目的挑战。到目前为止没什么大不了的;)现在来谈谈问题。我们有一个ID列表,其中我们定期(每2秒)为每个ID调用StoredProcedure。需要分别检查每个项目的2秒,因为它们在运行时添加和删除。此外,我们希望配置最大的并行度,因为数据库不应该同时充斥着300个线程。正在处理的项目在完成上一次执行之前不应重新安排处理时间。原因是,我们希望防止在数据库出现延迟的情况下排队等候大量项目。

现在,我们使用的是一个自行开发的组件,它有一个主线程,定期检查需要调度哪些项目进行处理。一旦它有了列表,它就把它们放在一个基于IOCP的自定义线程池中,然后使用等待句柄来等待正在处理的项目。然后开始下一次迭代。IOCP,因为它提供了窃取工作。

我想用TPL/.NET4版本取代这个自定义实现,我想知道您将如何解决它(理想情况下是简单且可读/可维护)。我知道这篇文章:http://msdn.microsoft.com/en-us/library/ee789351.aspx,但这只是限制了正在使用的线程数量。下班后偷窃,定期执行项目。。。。

理想情况下,它将成为一个通用组件,可以用于项目列表中需要定期完成的所有任务。

欢迎任何输入,tiaMartin

TPL架构问题

我认为您实际上不需要为此而直接使用TPL Tasks。首先,我会在ConcurrentQueue(默认值)周围设置一个BlockingCollection,而在BlockingCollection上没有设置BoundedCapacity来存储需要处理的ID。

// Setup the blocking collection somewhere when your process starts up (OnStart for a Windows service)
BlockingCollection<string> idsToProcess = new BlockingCollection<string>();

从那里,我只会在BlockingCollection::GetConsumingEnumerable返回的枚举中使用Parallel::ForEach。在ForEach调用中,您将设置ParallelOptions::MaxDegreeOfParallelism。在ForEach的主体内,您将执行存储过程。

现在,一旦存储过程执行完成,您就表示不希望将执行重新安排为至少两秒钟。没问题,安排一个带有回调的System.Threading.Timer,它只需将ID添加回所提供回调中的BlockingCollection

Parallel.ForEach(
    idsToProcess.GetConsumingEnumerable(),
    new ParallelOptions 
    { 
        MaxDegreeOfParallelism = 4 // read this from config
    },
    (id) =>
    {
       // ... execute sproc ...
       // Need to declare/assign this before the delegate so that we can dispose of it inside 
       Timer timer = null;
       timer = new Timer(
           _ =>
           {
               // Add the id back to the collection so it will be processed again
               idsToProcess.Add(id);
               // Cleanup the timer
               timer.Dispose();
           },
           null, // no state, id wee need is "captured" in the anonymous delegate
           2000, // probably should read this from config
           Timeout.Infinite);
    }

最后,当进程关闭时,您将调用BlockingCollection::CompleteAdding,以便使用stop blocking和complete处理的可枚举对象以及Parallel::ForEach将退出。例如,如果这是一个Windows服务,您可以在OnStop中执行此操作。

// When ready to shutdown you just signal you're done adding
idsToProcess.CompleteAdding();

更新

你在评论中提出了一个合理的担忧,即你可能在任何给定的时间点处理大量的ID,并担心每个ID的计时器会有太多的开销。我绝对同意这一点。因此,在您同时处理一个大的ID列表的情况下,我会从每个ID使用一个计时器改为使用另一个队列来保存"睡眠"ID,该队列由一个短间隔计时器监控。首先,您需要一个ConcurrentQueue来放置休眠的ID:

ConcurrentQueue<Tuple<string, DateTime>> sleepingIds = new ConcurrentQueue<Tuple<string, DateTime>>();

现在,我在这里使用一个由两部分组成的Tuple进行说明,但为了更好的可读性,您可能需要为它创建一个更强类型的结构(或者至少用using语句将其别名)。元组具有id和DateTime,DateTime表示它何时被放入队列。

现在,您还需要设置将监视此队列的计时器:

Timer wakeSleepingIdsTimer = new Timer(
   _ =>
   {
       DateTime utcNow = DateTime.UtcNow;
       // Pull all items from the sleeping queue that have been there for at least 2 seconds
       foreach(string id in sleepingIds.TakeWhile(entry => (utcNow - entry.Item2).TotalSeconds >= 2))
       {
           // Add this id back to the processing queue
           idsToProcess.Enqueue(id);
       }
   },
   null, // no state
   Timeout.Infinite, // no due time
   100 // wake up every 100ms, probably should read this from config
 );

然后,您只需更改Parallel::ForEach以执行以下操作,而不是为每个操作设置计时器:

(id) =>
{
       // ... execute sproc ...
       sleepingIds.Enqueue(Tuple.Create(id, DateTime.UtcNow)); 
}

这与您在问题中所说的方法非常相似,但在TPL任务中也是如此。任务只是在完成时将自己添加回要安排的事情列表中。

在这个例子中,在纯列表上使用锁定是相当丑陋的,可能需要一个更好的集合来保存要调度的列表

// Fill the idsToSchedule
for (int id = 0; id < 5; id++)
{
    idsToSchedule.Add(Tuple.Create(DateTime.MinValue, id));
}
// LongRunning will tell TPL to create a new thread to run this on
Task.Factory.StartNew(SchedulingLoop, TaskCreationOptions.LongRunning);

这启动了SchedulingLoop,它实际上执行检查是否有东西运行已经两秒钟了

// Tuple of the last time an id was processed and the id of the thing to schedule
static List<Tuple<DateTime, int>> idsToSchedule = new List<Tuple<DateTime, int>>();
static int currentlyProcessing = 0;
const int ProcessingLimit = 3;
// An event loop that performs the scheduling
public static void SchedulingLoop()
{
    while (true)
    {
        lock (idsToSchedule)
        {
            DateTime currentTime = DateTime.Now;
            for (int index = idsToSchedule.Count - 1; index >= 0; index--)
            {
                var scheduleItem = idsToSchedule[index];
                var timeSincePreviousRun = (currentTime - scheduleItem.Item1).TotalSeconds;
                // start it executing in a background task
                if (timeSincePreviousRun > 2 && currentlyProcessing < ProcessingLimit)
                {
                    Interlocked.Increment(ref currentlyProcessing);
                    Console.WriteLine("Scheduling {0} after {1} seconds", scheduleItem.Item2, timeSincePreviousRun);
                    // Schedule this task to be processed
                    Task.Factory.StartNew(() =>
                        {
                            Console.WriteLine("Executing {0}", scheduleItem.Item2);
                            // simulate the time taken to call this procedure
                            Thread.Sleep(new Random((int)DateTime.Now.Ticks).Next(0, 5000) + 500);
                            lock (idsToSchedule)
                            {
                                idsToSchedule.Add(Tuple.Create(DateTime.Now, scheduleItem.Item2));
                            }
                            Console.WriteLine("Done Executing {0}", scheduleItem.Item2);
                            Interlocked.Decrement(ref currentlyProcessing);
                        });
                    // remove this from the list of things to schedule
                    idsToSchedule.RemoveAt(index);
                }
            }
        }
        Thread.Sleep(100);
    }
}