消息已自动删除RabbitMQ

本文关键字:删除 RabbitMQ 消息 | 更新日期: 2023-09-27 18:22:08

我使用了RabbitMQ来存储消息。我注意到消息在应用程序重新启动时被删除。

我在同一个应用程序中有生产者和消费者。

请查找生产者和消费者,如下所示。我使用了持久队列和持久消息。

所以,若队列只有一个使用者,而它当前并没有使用者,那个么队列消息就会被删除。是这样吗?

生产者:

public static void PublishMessage(RequestDto message, string queueName)
    {
        var factory = new ConnectionFactory() { HostName = Config.RabbitMqHostName, Port = Config.RabbitMqPortNumber };
        using (var connection = factory.CreateConnection())
        {
            using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare(queueName, true, false, false, null);
                var properties = channel.CreateBasicProperties();
                properties.SetPersistent(true);
               // properties.DeliveryMode = 2; I have used this too.
                string serializesMessage = Utility.SerializeSoapObject(message);
                var messageBytes = Encoding.UTF8.GetBytes(serializesMessage);
                channel.BasicPublish("", queueName, properties , messageBytes);
                Log.Info("Record added into queue : 'nMessage: " + serializesMessage);
            }
        }
    }

消费者:

var factory = new ConnectionFactory() { HostName = Config.RabbitMqHostName, Port = Config.RabbitMqPortNumber };
            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    channel.QueueDeclare(Config.RabbitMqQueueName, true, false, false, null);
                    var consumer = new QueueingBasicConsumer(channel);
                    channel.BasicConsume(Config.RabbitMqQueueName, true, consumer);
                    while (DoProcessMessage())
                    {
                          try
                            {
                                List<RequestDto> messages = GetMessagesInBatch(consumer);
                                if (messages.Count > 0)
                                {
                                    ProcessMessageInParallel(messages);
                                }
                                else
                                {
                                    Producer.FillRequestMessages();
                                }
                            }
                            catch (Exception exception)
                            {
                                Log.Error("StartConsumer - Failed to process message from RabbitMq Error: " + exception.Message, exception);
                            }
                    }
                }
            }
        }
        catch (Exception exception)
        {
            Log.Error(exception.Message, exception);
        }
    private bool DoProcessMessage()
    {
        return Config.MaxRequestPerDayCount > 1000;
    }

如果有人能帮忙的话。

消息已自动删除RabbitMQ

您似乎正在将noAck = true传递给basicConsume函数:https://www.rabbitmq.com/releases/rabbitmq-java-client/v1.7.0/rabbitmq-java-client-javadoc-1.7.0/com/rabbitmq/client/Channel.html#basicConsume(java.lang.String,boolean,com.rabbitmq.client.Consumer)

在无ack模式下,RabbitMQ将向使用者发送消息,并立即将其从队列中删除。