线程.睡眠阻塞并行执行任务
本文关键字:并行 执行任务 线程 | 更新日期: 2023-09-27 18:09:51
我正在调用一个worker方法,该方法调用数据库,然后迭代并产生并行处理的返回值。为了防止它冲击数据库,我有一个线程。Sleep在这里暂停对DB的执行。然而,这似乎阻塞了仍然发生在Parallel.ForEach中的执行。实现这一点以防止阻塞的最佳方法是什么?
private void ProcessWorkItems()
{
_cancellation = new CancellationTokenSource();
_cancellation.Token.Register(() => WorkItemRepository.ResetAbandonedWorkItems());
Task.Factory.StartNew(() =>
Parallel.ForEach(GetWorkItems().AsParallel().WithDegreeOfParallelism(10), workItem =>
{
var x = ItemFactory(workItem);
x.doWork();
}), _cancellation.Token);
}
private IEnumerable<IAnalysisServiceWorkItem> GetWorkItems()
{
while (!_cancellation.IsCancellationRequested)
{
var workItems = WorkItemRepository.GetItemList(); //database call
workItems.ForEach(item =>
{
item.QueueWorkItem(WorkItemRepository);
});
foreach (var item in workItems)
{
yield return item;
}
if (workItems.Count == 0)
{
Thread.Sleep(30000); //sleep this thread for 30 seconds if no work items.
}
}
yield break;
}
编辑:我改变了它,包括答案,它仍然没有工作,因为我期待。我将. asparallel (). withdegreeofparallelism(10)添加到GetWorkItems()调用中。当我认为并行应该继续执行时,即使基本线程正在睡觉,我的期望是否不正确?
的例子:我有15个元素,它迭代并获取10个元素并开始它们。当每个项目完成后,它会从GetWorkItems中请求另一个项目,直到它尝试请求第16个项目。在这一点上,它应该停止尝试获取更多的项目,但应该继续处理项目11-15,直到它们完成。平行应该是这样的吗?因为它目前没有这样做。它当前所做的是,当它完成6时,它锁定后续的10仍然在Parallel.ForEach中运行。
我建议您创建一个工作项的BlockingCollection(一个队列),以及一个每30秒调用数据库来填充它的计时器。比如:
BlockingCollection<WorkItem> WorkItems = new BlockingCollection<WorkItem>();
和初始化时:
System.Threading.Timer WorkItemTimer = new Timer((s) =>
{
var items = WorkItemRepository.GetItemList(); //database call
foreach (var item in items)
{
WorkItems.Add(item);
}
}, null, 30000, 30000);
将每30秒查询数据库中的项目。
对于安排要处理的工作项,您有许多不同的解决方案。与你所拥有的最接近的是:
WorkItem item;
while (WorkItems.TryTake(out item, Timeout.Infinite, _cancellation))
{
Task.Factory.StartNew((s) =>
{
var myItem = (WorkItem)s;
// process here
}, item);
}
这消除了任何线程中的阻塞,并让TPL决定如何最好地分配并行任务。
编辑:实际上,更接近你的是:
foreach (var item in WorkItems.GetConsumingEnumerable(_cancellation))
{
// start task to process item
}
你可以使用:
Parallel.Foreach(WorkItems.GetConsumingEnumerable(_cancellation).AsParallel ...
我不知道这是否有效,也不知道效果如何。也许值得一试…
编辑结束
一般来说,我的建议是你把它当作一个生产者/消费者应用程序,生产者是一个线程,定期查询数据库的新项目。我的示例每N秒(在本例中为30秒)查询一次数据库,如果您平均每30秒清空工作队列,那么这种方法就可以很好地工作。这将使从条目发布到数据库的平均延迟时间小于一分钟,直到您得到结果。
您可以降低轮询频率(从而降低延迟),但这会导致更多的数据库流量。
你也可以用它变得更花哨。例如,如果您在30秒后对数据库进行轮询,并且您得到了大量的条目,那么您可能很快就会得到更多的条目,并且您将希望在15秒(或更短)内再次轮询。相反,如果您在30秒后轮询数据库而没有得到任何结果,那么您可能需要等待更长的时间才能再次轮询。
您可以使用一次性计时器设置这种自适应轮询。也就是说,在创建计时器时为最后一个参数指定-1,这将导致它只触发一次。您的计时器回调计算出在下一次轮询之前等待多长时间,并调用Timer.Change
以使用新值初始化计时器。
你可以使用。withdegreeofparallelism()扩展方法来强制PLinq同时运行任务。c#线程手册
您可能与分区程序发生冲突。
因为你传递的是IEnumerable, Parallel。ForEach将使用一个Chunk Partitioner,它可以尝试一次从块中的枚举中抓取几个元素。但是你的IEnumerable。MoveNext可以睡觉,这会打乱事情。
您可以编写自己的Partitioner,每次返回一个元素,但无论如何,我认为像Jim Mischel建议的生产者/消费者方法会更好。
你想用睡眠来完成什么?据我所知,您在试图避免频繁地调用数据库。我不知道有更好的方法来做到这一点,但似乎理想情况下,您的GetItemList
调用将被阻塞,直到数据可用来处理。