确保长时间运行的任务只触发一次,并且随后的请求进入队列,但队列中只有一个条目

本文关键字:队列 请求 长时间 有一个 运行 确保 一次 任务 | 更新日期: 2023-09-27 18:17:04

我有一个计算密集型方法Calculate,可能会运行几秒钟,请求来自多个线程。

应该只执行一个Calculate,后续请求应该排队,直到初始请求完成。如果已经有一个请求排队,那么后续请求可以被丢弃(因为排队的请求将是足够的)

似乎有很多可能的解决方案,但我只需要最简单的。

更新:这是我的初步尝试:

private int _queueStatus;
private readonly object _queueStatusSync = new Object();
public void Calculate()
{
    lock(_queueStatusSync)
    {
        if(_queueStatus == 2) return;
        _queueStatus++;
        if(_queueStatus == 2) return;
    }
    for(;;)
    {
        CalculateImpl();
        lock(_queueStatusSync)
            if(--_queueStatus == 0) return;
    }
}
private void CalculateImpl()
{
    // long running process will take a few seconds...
}

确保长时间运行的任务只触发一次,并且随后的请求进入队列,但队列中只有一个条目

IMO最简单,最干净的解决方案是使用TPL Dataflow(一如既往)与BufferBlock作为队列。BufferBlock是线程安全的,支持async-await,更重要的是,TryReceiveAll可以一次获取所有项。它也有OutputAvailableAsync,所以你可以异步等待项目被提交到缓冲区。当发布多个请求时,您只需选择最后一个请求,而忘记其他请求:

var buffer = new BufferBlock<Request>();
var task = Task.Run(async () =>
{
    while (await buffer.OutputAvailableAsync())
    {
        IList<Request> requests;
        buffer.TryReceiveAll(out requests);
        Calculate(requests.Last());
    }
});

用法:

buffer.Post(new Request());
buffer.Post(new Request());

Edit:如果您没有Calculate方法的任何输入或输出,您可以简单地使用boolean作为开关。如果它为真,你可以关闭它并计算,如果它在Calculate运行时再次为真,那么再次计算:

public bool _shouldCalculate;
public void Producer()
{
    _shouldCalculate = true;
}
public async Task Consumer()
{
    while (true)
    {
        if (!_shouldCalculate)
        {
            await Task.Delay(1000);
        }
        else
        {
            _shouldCalculate = false;
            Calculate();
        }
    }
}

一次只取1个值的BlockingCollection
窍门是,如果集合

中有任何项,则跳过

我同意I3aron +1的答案
这(可能)是一个BlockingCollection解决方案

public static void BC_AddTakeCompleteAdding()
{
    using (BlockingCollection<int> bc = new BlockingCollection<int>(1))
    {
        // Spin up a Task to populate the BlockingCollection  
        using (Task t1 = Task.Factory.StartNew(() =>
        {
            for (int i = 0; i < 100; i++)
            {
                if (bc.TryAdd(i))
                {
                    Debug.WriteLine("  add  " + i.ToString());
                }
                else
                {
                    Debug.WriteLine("  skip " + i.ToString());
                }
                Thread.Sleep(30);
            }
            bc.CompleteAdding();
        }))
        {
            // Spin up a Task to consume the BlockingCollection 
            using (Task t2 = Task.Factory.StartNew(() =>
            {
                try
                {
                    // Consume consume the BlockingCollection 
                    while (true)
                    {
                        Debug.WriteLine("take " + bc.Take());
                        Thread.Sleep(100);
                    }
                }
                catch (InvalidOperationException)
                {
                    // An InvalidOperationException means that Take() was called on a completed collection
                    Console.WriteLine("That's All!");
                }
            }))
                Task.WaitAll(t1, t2);
        }
    }
}

这听起来像是典型的生产者-消费者关系。我建议查看BlockingCollection<T>。它是System.Collection.Concurrent命名空间的一部分。在此基础上,您可以实现您的队列逻辑。

你可以给BlockingCollection提供任何内部结构来保存它的数据,比如ConcurrentBag<T>, ConcurrentQueue<T>等等。后者是使用的默认结构