在C#中,如何处理当前队列中的所有RabbitMQ消息
本文关键字:队列 消息 RabbitMQ 处理 何处理 | 更新日期: 2023-09-27 18:21:16
RabbitMQ基本教程提供了一个如何从队列中连续检索消息的示例:
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.QueueDeclare("hello", false, false, false, null);
var consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume("hello", true, consumer);
Console.WriteLine(" [*] Waiting for messages." +
"To exit press CTRL+C");
while (true)
{
var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
}
}
}
我想做的是检索所有已放入队列的消息,然后停止。
这里有两个例子可以解决我的问题
- 如果我在下午1点开始我的代码,我希望在下午1点钟之前处理所有已放入队列的消息
或
- 如果我在13:00:00开始我的代码,并且我的代码需要10秒才能运行,我不介意它是否包括13:00:00到13:00:10之间放置在队列中的消息,只要队列一空它就停止
我意识到我可能可以在消息中加一个时间戳并检查它,或者我可以篡改超时值,但我想知道是否有任何内置的方法可以正确地做到这一点。
提前谢谢。
从评论中可以看出,RabbitMQ似乎不适合批量处理,因此它并不是为此目的而设计的。
我还注意到,当测试DequeueNoWait方法时,或者尝试在零超时的情况下进行Dequeue时,它们根本不起作用,只是返回null。
以下解决方案使用QueueDeclare来获取现有消息的计数,并且不需要时间戳或黑客超时:
var factory = new ConnectionFactory { HostName = "localhost" };
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
var queueDeclareResponse = channel.QueueDeclare(Constants.QueueName, false, false, false, null);
var consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume(Constants.QueueName, true, consumer);
Console.WriteLine(" [*] Processing existing messages.");
for (int i = 0; i < queueDeclareResponse.MessageCount; i++)
{
var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
}
Console.WriteLine("Finished processing {0} messages.", queueDeclareResponse.MessageCount);
Console.ReadLine();
}
}
您可以做什么:
1.)首先向消息对象添加时间戳
2.)如果时间戳无效,则拒绝消息。它是rabbitmq的集成功能
参考