TPL架构问题
本文关键字:问题 TPL | 更新日期: 2023-09-27 17:59:03
我目前正在做一个项目,在这个项目中,我们面临着并行处理项目的挑战。到目前为止没什么大不了的;)现在来谈谈问题。我们有一个ID列表,其中我们定期(每2秒)为每个ID调用StoredProcedure。需要分别检查每个项目的2秒,因为它们在运行时添加和删除。此外,我们希望配置最大的并行度,因为数据库不应该同时充斥着300个线程。正在处理的项目在完成上一次执行之前不应重新安排处理时间。原因是,我们希望防止在数据库出现延迟的情况下排队等候大量项目。
现在,我们使用的是一个自行开发的组件,它有一个主线程,定期检查需要调度哪些项目进行处理。一旦它有了列表,它就把它们放在一个基于IOCP的自定义线程池中,然后使用等待句柄来等待正在处理的项目。然后开始下一次迭代。IOCP,因为它提供了窃取工作。
我想用TPL/.NET4版本取代这个自定义实现,我想知道您将如何解决它(理想情况下是简单且可读/可维护)。我知道这篇文章:http://msdn.microsoft.com/en-us/library/ee789351.aspx,但这只是限制了正在使用的线程数量。下班后偷窃,定期执行项目。。。。
理想情况下,它将成为一个通用组件,可以用于项目列表中需要定期完成的所有任务。
欢迎任何输入,tiaMartin
我认为您实际上不需要为此而直接使用TPL Tasks
。首先,我会在ConcurrentQueue
(默认值)周围设置一个BlockingCollection
,而在BlockingCollection
上没有设置BoundedCapacity
来存储需要处理的ID。
// Setup the blocking collection somewhere when your process starts up (OnStart for a Windows service)
BlockingCollection<string> idsToProcess = new BlockingCollection<string>();
从那里,我只会在BlockingCollection::GetConsumingEnumerable
返回的枚举中使用Parallel::ForEach
。在ForEach
调用中,您将设置ParallelOptions::MaxDegreeOfParallelism
。在ForEach
的主体内,您将执行存储过程。
现在,一旦存储过程执行完成,您就表示不希望将执行重新安排为至少两秒钟。没问题,安排一个带有回调的System.Threading.Timer
,它只需将ID添加回所提供回调中的BlockingCollection
。
Parallel.ForEach(
idsToProcess.GetConsumingEnumerable(),
new ParallelOptions
{
MaxDegreeOfParallelism = 4 // read this from config
},
(id) =>
{
// ... execute sproc ...
// Need to declare/assign this before the delegate so that we can dispose of it inside
Timer timer = null;
timer = new Timer(
_ =>
{
// Add the id back to the collection so it will be processed again
idsToProcess.Add(id);
// Cleanup the timer
timer.Dispose();
},
null, // no state, id wee need is "captured" in the anonymous delegate
2000, // probably should read this from config
Timeout.Infinite);
}
最后,当进程关闭时,您将调用BlockingCollection::CompleteAdding
,以便使用stop blocking和complete处理的可枚举对象以及Parallel::ForEach将退出。例如,如果这是一个Windows服务,您可以在OnStop
中执行此操作。
// When ready to shutdown you just signal you're done adding
idsToProcess.CompleteAdding();
更新
你在评论中提出了一个合理的担忧,即你可能在任何给定的时间点处理大量的ID,并担心每个ID的计时器会有太多的开销。我绝对同意这一点。因此,在您同时处理一个大的ID列表的情况下,我会从每个ID使用一个计时器改为使用另一个队列来保存"睡眠"ID,该队列由一个短间隔计时器监控。首先,您需要一个ConcurrentQueue
来放置休眠的ID:
ConcurrentQueue<Tuple<string, DateTime>> sleepingIds = new ConcurrentQueue<Tuple<string, DateTime>>();
现在,我在这里使用一个由两部分组成的Tuple
进行说明,但为了更好的可读性,您可能需要为它创建一个更强类型的结构(或者至少用using
语句将其别名)。元组具有id和DateTime,DateTime表示它何时被放入队列。
现在,您还需要设置将监视此队列的计时器:
Timer wakeSleepingIdsTimer = new Timer(
_ =>
{
DateTime utcNow = DateTime.UtcNow;
// Pull all items from the sleeping queue that have been there for at least 2 seconds
foreach(string id in sleepingIds.TakeWhile(entry => (utcNow - entry.Item2).TotalSeconds >= 2))
{
// Add this id back to the processing queue
idsToProcess.Enqueue(id);
}
},
null, // no state
Timeout.Infinite, // no due time
100 // wake up every 100ms, probably should read this from config
);
然后,您只需更改Parallel::ForEach
以执行以下操作,而不是为每个操作设置计时器:
(id) =>
{
// ... execute sproc ...
sleepingIds.Enqueue(Tuple.Create(id, DateTime.UtcNow));
}
这与您在问题中所说的方法非常相似,但在TPL任务中也是如此。任务只是在完成时将自己添加回要安排的事情列表中。
在这个例子中,在纯列表上使用锁定是相当丑陋的,可能需要一个更好的集合来保存要调度的列表
// Fill the idsToSchedule
for (int id = 0; id < 5; id++)
{
idsToSchedule.Add(Tuple.Create(DateTime.MinValue, id));
}
// LongRunning will tell TPL to create a new thread to run this on
Task.Factory.StartNew(SchedulingLoop, TaskCreationOptions.LongRunning);
这启动了SchedulingLoop,它实际上执行检查是否有东西运行已经两秒钟了
// Tuple of the last time an id was processed and the id of the thing to schedule
static List<Tuple<DateTime, int>> idsToSchedule = new List<Tuple<DateTime, int>>();
static int currentlyProcessing = 0;
const int ProcessingLimit = 3;
// An event loop that performs the scheduling
public static void SchedulingLoop()
{
while (true)
{
lock (idsToSchedule)
{
DateTime currentTime = DateTime.Now;
for (int index = idsToSchedule.Count - 1; index >= 0; index--)
{
var scheduleItem = idsToSchedule[index];
var timeSincePreviousRun = (currentTime - scheduleItem.Item1).TotalSeconds;
// start it executing in a background task
if (timeSincePreviousRun > 2 && currentlyProcessing < ProcessingLimit)
{
Interlocked.Increment(ref currentlyProcessing);
Console.WriteLine("Scheduling {0} after {1} seconds", scheduleItem.Item2, timeSincePreviousRun);
// Schedule this task to be processed
Task.Factory.StartNew(() =>
{
Console.WriteLine("Executing {0}", scheduleItem.Item2);
// simulate the time taken to call this procedure
Thread.Sleep(new Random((int)DateTime.Now.Ticks).Next(0, 5000) + 500);
lock (idsToSchedule)
{
idsToSchedule.Add(Tuple.Create(DateTime.Now, scheduleItem.Item2));
}
Console.WriteLine("Done Executing {0}", scheduleItem.Item2);
Interlocked.Decrement(ref currentlyProcessing);
});
// remove this from the list of things to schedule
idsToSchedule.RemoveAt(index);
}
}
}
Thread.Sleep(100);
}
}