RabbitMQ and C#
本文关键字:and RabbitMQ | 更新日期: 2023-09-27 18:14:58
使用RabbitMQ是否有一种类似于MSSMQ的方法,可以从队列中弹出1000条消息,然后对数据库进行插入,并从那里继续。
我似乎无法通过订阅通道做到这一点,然后在订阅中的BasicDeliveryEventArgs上执行foreach,并使用我想在给定时间处理的最大消息计数执行If语句。
提前感谢然而,这仍然从队列
中获取所有22k条消息。using (IConnection connection = factory.CreateConnection())
{
using (IModel channel = connection.CreateModel())
{
channel.QueueDeclare("****", true, false, false, null);
var subscription = new Subscription(channel, "****", false);
int maxMessages = 5;
int i = 0;
foreach (BasicDeliverEventArgs eventArgs in subscription)
{
if (++i == maxMessages)
{
Console.WriteLine("Took 5 messages");
subscription.Ack(eventArgs);
break;
}
}
}
}
我假设您希望通过将消息组分批处理成更大的事务来优化消息加载到数据库中,而不是消耗每个消息的事务成本。有了强制性的警告,这样做意味着大组消息可能一起失败,即使其中只有一个导致问题,以下是您如何处理的方法…
设置通道上的QOS:
channel.BasicQos(0, 1000, false);
这将预取1000条消息并阻止进一步的流量,直到您确认某些内容。注意,它不会以1000为块进行获取。相反,它确保每次预取最多1000条UNACK消息。模拟块传输就像首先处理1000条消息一样简单,然后一次对它们进行确认。
可以在这里和这里找到比我更权威的解释。
还有一点:您可能希望在消息可用时立即刷新队列,即使您还没有达到1000条消息的配额。您应该能够通过在foreach
循环中调用queue.BasicGet()
来做到这一点,直到它运行枯竭,然后将您拥有的任何内容(包括从subscription
中取出的消息)传递到数据库。警告:我自己没有尝试过,所以我可能是在胡说八道,但我认为它会起作用。这种方法的优点在于,它可以立即将消息推送到数据库中,而不必等待1000条消息的完整批处理。如果数据库因为处理太多的小事务而落后,预取积压会在每个周期之间被填满。