Azure服务总线队列消息在Message.Abandon之后有死信

本文关键字:之后 Abandon Message 服务 总线 队列 消息 Azure | 更新日期: 2023-09-27 18:27:03

我正在试用Azure服务总线队列。我有以下代码:

队列发送:

string strConnectionString = ConfigurationManager.AppSettings["Microsoft.ServiceBus.ConnectionString"];
        var namespaceManager = NamespaceManager.CreateFromConnectionString(strConnectionString);
        if (!namespaceManager.QueueExists("Test"))
        {
            QueueDescription qD = new QueueDescription("Test");
            qD.DefaultMessageTimeToLive = new TimeSpan(05, 00, 00);
            qD.LockDuration = new TimeSpan(00, 02, 30);
            qD.MaxSizeInMegabytes = 5120;                
            namespaceManager.CreateQueue(qD);              
        }
        if (namespaceManager.QueueExists("Test"))
        {
            QueueClient client = QueueClient.CreateFromConnectionString(strConnectionString, "Test", ReceiveMode.PeekLock);
            var qMessage = Console.ReadLine();
            using (MemoryStream strm = new MemoryStream(Encoding.UTF8.GetBytes(qMessage)))
            {
                BrokeredMessage bMsg = new BrokeredMessage(strm);
                bMsg.MessageId = Guid.NewGuid().ToString();
                bMsg.TimeToLive = new TimeSpan(05, 00, 00);
                client.Send(bMsg);
                Console.WriteLine("Message sent");
            }
        }
        Console.ReadLine();

接收代码:

 string strConnectionString = ConfigurationManager.AppSettings["Microsoft.ServiceBus.ConnectionString"];
        var namespaceManager = NamespaceManager.CreateFromConnectionString(strConnectionString);
        if (namespaceManager.QueueExists("Test"))
        {
            QueueClient client = QueueClient.CreateFromConnectionString(strConnectionString, "Test",ReceiveMode.PeekLock);
            if (client != null)
            {
                OnMessageOptions options = new OnMessageOptions();
                options.AutoComplete = false;
                options.AutoRenewTimeout = TimeSpan.FromSeconds(31);                 

                client.OnMessage((message) =>
                    {                           
                        Console.WriteLine(message.State.ToString());
                        Console.WriteLine("Message Id: " + message.MessageId);
                        Stream stream = message.GetBody<Stream>();
                        StreamReader reader = new StreamReader(stream);
                        Console.WriteLine("Message: " + reader.ReadToEnd());                                
                        Console.WriteLine("***************");
                        message.Abandon();
                    });
                Console.ReadLine();
            }
        }

我看到每当我调用放弃时,消息都会变为DeadLettered。我的假设是,它应该处于活动状态,可以被另一个客户使用。

Azure服务总线队列消息在Message.Abandon之后有死信

您对BrokeredMessage的理解。放弃Api是正确的。其目的是放弃在消息上获取的peek锁(但NOT放弃消息本身),从而使其他接收者可以获取消息。

以下是我们如何设想偷看信息的不同状态:

基础知识优先

"为什么":如果客户需要竞争消费者(作业队列)语义,即他们需要多个工人同时处理来自一个队列的不同消息,并保证一次,那么他们将使用ReceiveMode.PeekLock。在该模型中,每个工作者(队列接收器)都需要一种方法来将其当前消息(Job)的进度传达给其他工作者。因此,brokeredMessage提供了4个函数来表示状态。

"What"

  • 如果当前Worker成功处理了消息,请调用BrokeredMessage.Complete()
  • 如果当前工作进程无法处理BrokeredMessage,并且希望在另一个工作进程上重试处理,则放弃该消息。但是,这里的问题是:比方说,有两个工人,他们每个人都认为另一个可以处理这个消息,并调用"放弃"——很快他们就会陷入一个无限循环,试图只处理那个消息!因此,为了避免这种情况,我们在QueueDescription上提供了一个名为MaxDeliveryCount的配置。此设置保护消息从队列传递到接收方的次数限制在上面的示例中,每次接收(和放弃)消息时,ServiceBus服务上的"deliveryCount"都会递增。当它达到10时,消息已达到最大发送次数,因此将被封为死信
  • 如果当前接收方(工作者)确信,则无法处理此消息,BrokeredMessage.DadLetter()。此处的目标是让使用应用程序定期审核死信消息
  • 如果当前接收方(工作者)无法处理此消息,但是知道可以在稍后的时间点BrokeredMessage.Defer()处理此消息

啊!Sree