使用一个或多个标准FIFO队列实现延迟队列
本文关键字:队列 标准 FIFO 延迟 实现 一个 | 更新日期: 2023-09-27 17:50:37
延迟队列是这样一种队列,其中每个消息都有一个与之关联的延迟时间,并且只有当其延迟过期时,消息才能被接收。队列的头是过去延迟过期时间最长的消息。如果没有延迟过期,则没有head, dequeue将返回null。
实际上,我正在使用Azure编写云应用程序,在Azure中只有FIFO队列可用,而不是优先级/延迟队列。所以我来这里是想看看有没有人能给我一些指点,让我在正确的方向上开始。我在谷歌上搜索了很多,但只找到了Java中的延迟队列实现,没有标准的教程/研究论文讨论延迟队列的一般情况。
编辑:我有什么代码?
实际上,我必须先设计这些东西,然后把它交给我的经理,一旦我们完成了设计,只有我才能开始编码。
关于场景的更多详细信息
它是一个基于主/从模型的分布式应用程序。主服务器生成消息并将其放入Azure服务总线队列中,并且有多个从服务器(运行在多台机器上)从队列中读取并处理消息。如果一个主服务器发生故障,那么其中一个从服务器充当主服务器并开始生成消息。我不想在master中存储任何状态信息因为一旦master失效所有的状态信息也会随之消失
Windows Azure队列消息具有延迟(以秒为单位),您可以在将消息插入队列时指定该延迟。在达到超时延迟之前,消息将不可见。请参阅这篇MSDN文章以查看API的详细信息。
不可见超时也在各种语言SDK实现中实现。由于您正在使用c#,下面是AddMessage()
调用的样子。注意AddMessage()
的第三个参数指定了不可见超时:
var acct = CloudStorageAccount.DevelopmentStorageAccount;
var queueClient = acct.CreateCloudQueueClient();
var queue = queueClient.GetQueueReference("myqueue");
queue.CreateIfNotExist();
var msg = new CloudQueueMessage("test message");
queue.AddMessage(msg, TimeSpan.FromHours(2), TimeSpan.FromMinutes(30));
首先,我们需要一个优先级队列的实现。这是我不久前写的一个。这可能并不理想;它有一个很小的API,它可能会表现得更好,但这是一个足够的起点:
public class PriorityQueue<TPriority, TElement>
{
SortedDictionary<TPriority, Queue<TElement>> dictionary = new SortedDictionary<TPriority, Queue<TElement>>();
public PriorityQueue()
{
}
public Tuple<TPriority, TElement> Peek()
{
var firstPair = dictionary.First();
return Tuple.Create(firstPair.Key, firstPair.Value.First());
}
public TElement Pop()
{
var firstPair = dictionary.First();
TElement output = firstPair.Value.Dequeue();
if (!firstPair.Value.Any())
dictionary.Remove(firstPair.Key);
return output;
}
public void Push(TPriority priority, TElement element)
{
Queue<TElement> queue;
if (dictionary.TryGetValue(priority, out queue))
{
queue.Enqueue(element);
}
else
{
var newQueue = new Queue<TElement>();
newQueue.Enqueue(element);
dictionary.Add(priority, newQueue);
}
}
}
延迟队列包装起来很简单:
public class DelayQueue<T>
{
private PriorityQueue<DateTime, T> queue = new PriorityQueue<DateTime, T>();
public void Enqueue(T item, int delay)
{
queue.Push(DateTime.Now.AddMilliseconds(delay), item);
}
public T Dequeue()
{
if (queue.Peek().Item1 > DateTime.Now)
return queue.Pop();
else
return default(T);
}
}
如何构建一个包含两步流程的队列来解除条目队列?下面是高级流程:
-
取消FIFO队列中的第一项;将它的不可见性设置为N分钟(无论你决定不可见性应该是多少)-这允许你使项目在一段时间内不可见,就好像它不存在于队列中一样。这里是我所指的NextVisibleTime属性。
-
Check DequeueCount属性—如果dequeue count为0,表示该项是第一次被请求。忽略这个问题,继续前进。既然它的隐身性被设置好了,它就不会再被取走,直到时机成熟。如果脱队计数为1或更大,则它被脱队一次,并且必须在所需的时间内设置为不可见。
这应该允许您实现延迟队列。我也能想到其他方法。例如,队列中的每个项目作为一个创建时间;这可以用来动态计算一个项目需要保持不可见的时间。要更改属性的不可见性,请检查此方法:http://msdn.microsoft.com/en-us/library/microsoft.windowsazure.storageclient.cloudqueue.updatemessage.aspx