使用一个或多个标准FIFO队列实现延迟队列

本文关键字:队列 标准 FIFO 延迟 实现 一个 | 更新日期: 2023-09-27 17:50:37

延迟队列是这样一种队列,其中每个消息都有一个与之关联的延迟时间,并且只有当其延迟过期时,消息才能被接收。队列的头是过去延迟过期时间最长的消息。如果没有延迟过期,则没有head, dequeue将返回null。

实际上,我正在使用Azure编写云应用程序,在Azure中只有FIFO队列可用,而不是优先级/延迟队列。所以我来这里是想看看有没有人能给我一些指点,让我在正确的方向上开始。我在谷歌上搜索了很多,但只找到了Java中的延迟队列实现,没有标准的教程/研究论文讨论延迟队列的一般情况。

编辑:

我有什么代码?
实际上,我必须先设计这些东西,然后把它交给我的经理,一旦我们完成了设计,只有我才能开始编码。

关于场景的更多详细信息
它是一个基于主/从模型的分布式应用程序。主服务器生成消息并将其放入Azure服务总线队列中,并且有多个从服务器(运行在多台机器上)从队列中读取并处理消息。如果一个主服务器发生故障,那么其中一个从服务器充当主服务器并开始生成消息。我不想在master中存储任何状态信息因为一旦master失效所有的状态信息也会随之消失

使用一个或多个标准FIFO队列实现延迟队列

Windows Azure队列消息具有延迟(以秒为单位),您可以在将消息插入队列时指定该延迟。在达到超时延迟之前,消息将不可见。请参阅这篇MSDN文章以查看API的详细信息。

不可见超时也在各种语言SDK实现中实现。由于您正在使用c#,下面是AddMessage()调用的样子。注意AddMessage()的第三个参数指定了不可见超时:

        var acct = CloudStorageAccount.DevelopmentStorageAccount;
        var queueClient = acct.CreateCloudQueueClient();
        var queue = queueClient.GetQueueReference("myqueue");
        queue.CreateIfNotExist();
        var msg = new CloudQueueMessage("test message");
        queue.AddMessage(msg, TimeSpan.FromHours(2), TimeSpan.FromMinutes(30));

首先,我们需要一个优先级队列的实现。这是我不久前写的一个。这可能并不理想;它有一个很小的API,它可能会表现得更好,但这是一个足够的起点:

public class PriorityQueue<TPriority, TElement>
{
    SortedDictionary<TPriority, Queue<TElement>> dictionary = new SortedDictionary<TPriority, Queue<TElement>>();
    public PriorityQueue()
    {
    }
    public Tuple<TPriority, TElement> Peek()
    {
        var firstPair = dictionary.First();
        return Tuple.Create(firstPair.Key, firstPair.Value.First());
    }
    public TElement Pop()
    {
        var firstPair = dictionary.First();
        TElement output = firstPair.Value.Dequeue();
        if (!firstPair.Value.Any())
            dictionary.Remove(firstPair.Key);
        return output;
    }
    public void Push(TPriority priority, TElement element)
    {
        Queue<TElement> queue;
        if (dictionary.TryGetValue(priority, out queue))
        {
            queue.Enqueue(element);
        }
        else
        {
            var newQueue = new Queue<TElement>();
            newQueue.Enqueue(element);
            dictionary.Add(priority, newQueue);
        }
    }
}

延迟队列包装起来很简单:

public class DelayQueue<T>
{
    private PriorityQueue<DateTime, T> queue = new PriorityQueue<DateTime, T>();
    public void Enqueue(T item, int delay)
    {
        queue.Push(DateTime.Now.AddMilliseconds(delay), item);
    }
    public T Dequeue()
    {
        if (queue.Peek().Item1 > DateTime.Now)
            return queue.Pop();
        else
            return default(T);
    }
}

如何构建一个包含两步流程的队列来解除条目队列?下面是高级流程:

  • 取消FIFO队列中的第一项;将它的不可见性设置为N分钟(无论你决定不可见性应该是多少)-这允许你使项目在一段时间内不可见,就好像它不存在于队列中一样。这里是我所指的NextVisibleTime属性。

  • Check DequeueCount属性—如果dequeue count为0,表示该项是第一次被请求。忽略这个问题,继续前进。既然它的隐身性被设置好了,它就不会再被取走,直到时机成熟。如果脱队计数为1或更大,则它被脱队一次,并且必须在所需的时间内设置为不可见。

这应该允许您实现延迟队列。我也能想到其他方法。例如,队列中的每个项目作为一个创建时间;这可以用来动态计算一个项目需要保持不可见的时间。要更改属性的不可见性,请检查此方法:http://msdn.microsoft.com/en-us/library/microsoft.windowsazure.storageclient.cloudqueue.updatemessage.aspx