线程.睡眠阻塞并行执行任务

本文关键字:并行 执行任务 线程 | 更新日期: 2023-09-27 18:09:51

我正在调用一个worker方法,该方法调用数据库,然后迭代并产生并行处理的返回值。为了防止它冲击数据库,我有一个线程。Sleep在这里暂停对DB的执行。然而,这似乎阻塞了仍然发生在Parallel.ForEach中的执行。实现这一点以防止阻塞的最佳方法是什么?

private void ProcessWorkItems()
{
    _cancellation = new CancellationTokenSource();
    _cancellation.Token.Register(() => WorkItemRepository.ResetAbandonedWorkItems());
    Task.Factory.StartNew(() =>
        Parallel.ForEach(GetWorkItems().AsParallel().WithDegreeOfParallelism(10), workItem =>
        {
            var x = ItemFactory(workItem);
            x.doWork();
        }), _cancellation.Token);
}
private IEnumerable<IAnalysisServiceWorkItem> GetWorkItems()
{
    while (!_cancellation.IsCancellationRequested)
    {
        var workItems = WorkItemRepository.GetItemList(); //database call
        workItems.ForEach(item =>
        {
            item.QueueWorkItem(WorkItemRepository);
        });
        foreach (var item in workItems)
        {
            yield return item;
        }
        if (workItems.Count == 0)
        {
            Thread.Sleep(30000); //sleep this thread for 30 seconds if no work items.
        }
    }
    yield break;
}

编辑:我改变了它,包括答案,它仍然没有工作,因为我期待。我将. asparallel (). withdegreeofparallelism(10)添加到GetWorkItems()调用中。当我认为并行应该继续执行时,即使基本线程正在睡觉,我的期望是否不正确?

的例子:我有15个元素,它迭代并获取10个元素并开始它们。当每个项目完成后,它会从GetWorkItems中请求另一个项目,直到它尝试请求第16个项目。在这一点上,它应该停止尝试获取更多的项目,但应该继续处理项目11-15,直到它们完成。平行应该是这样的吗?因为它目前没有这样做。它当前所做的是,当它完成6时,它锁定后续的10仍然在Parallel.ForEach中运行。

线程.睡眠阻塞并行执行任务

我建议您创建一个工作项的BlockingCollection(一个队列),以及一个每30秒调用数据库来填充它的计时器。比如:

BlockingCollection<WorkItem> WorkItems = new BlockingCollection<WorkItem>();

和初始化时:

System.Threading.Timer WorkItemTimer = new Timer((s) =>
    {
        var items = WorkItemRepository.GetItemList(); //database call
        foreach (var item in items)
        {
            WorkItems.Add(item);
        }
    }, null, 30000, 30000);

将每30秒查询数据库中的项目。

对于安排要处理的工作项,您有许多不同的解决方案。与你所拥有的最接近的是:

WorkItem item;
while (WorkItems.TryTake(out item, Timeout.Infinite, _cancellation))
{
    Task.Factory.StartNew((s) =>
        {
            var myItem = (WorkItem)s;
            // process here
        }, item);
}

这消除了任何线程中的阻塞,并让TPL决定如何最好地分配并行任务。

编辑:

实际上,更接近你的是:

foreach (var item in WorkItems.GetConsumingEnumerable(_cancellation))
{
    // start task to process item
}

你可以使用:

Parallel.Foreach(WorkItems.GetConsumingEnumerable(_cancellation).AsParallel ...

我不知道这是否有效,也不知道效果如何。也许值得一试…

编辑结束

一般来说,我的建议是你把它当作一个生产者/消费者应用程序,生产者是一个线程,定期查询数据库的新项目。我的示例每N秒(在本例中为30秒)查询一次数据库,如果您平均每30秒清空工作队列,那么这种方法就可以很好地工作。这将使从条目发布到数据库的平均延迟时间小于一分钟,直到您得到结果。

您可以降低轮询频率(从而降低延迟),但这会导致更多的数据库流量。

你也可以用它变得更花哨。例如,如果您在30秒后对数据库进行轮询,并且您得到了大量的条目,那么您可能很快就会得到更多的条目,并且您将希望在15秒(或更短)内再次轮询。相反,如果您在30秒后轮询数据库而没有得到任何结果,那么您可能需要等待更长的时间才能再次轮询。

您可以使用一次性计时器设置这种自适应轮询。也就是说,在创建计时器时为最后一个参数指定-1,这将导致它只触发一次。您的计时器回调计算出在下一次轮询之前等待多长时间,并调用Timer.Change以使用新值初始化计时器。

你可以使用。withdegreeofparallelism()扩展方法来强制PLinq同时运行任务。c#线程手册

中的调用阻塞或I/O密集一节中有一个很好的例子。

您可能与分区程序发生冲突。

因为你传递的是IEnumerable, Parallel。ForEach将使用一个Chunk Partitioner,它可以尝试一次从块中的枚举中抓取几个元素。但是你的IEnumerable。MoveNext可以睡觉,这会打乱事情。

您可以编写自己的Partitioner,每次返回一个元素,但无论如何,我认为像Jim Mischel建议的生产者/消费者方法会更好。

你想用睡眠来完成什么?据我所知,您在试图避免频繁地调用数据库。我不知道有更好的方法来做到这一点,但似乎理想情况下,您的GetItemList调用将被阻塞,直到数据可用来处理。