Azure 服务总线队列 - 当消息位于队列中时,QueueClient.Receive() 返回空代理消息

本文关键字:消息 队列 QueueClient Receive 返回 代理 服务 总线 于队列 Azure | 更新日期: 2023-09-27 18:30:23

我在 Azure 服务总线队列中有无法接收的消息。而且我没有得到任何关于问题出在哪里的指标。我认为这与消息大小有关。你可以从下面的代码中看到我正在使用OpenFileDialog。我正在选择 jpeg 图像,它们被发送到队列中。现在,当我发送小于 50KB 的小图像时,接收过程可以很好地显示它们,但超过 100KB 的大图像只是留在队列中。MSDN 说消息大小限制为 256KB,所以我不确定这里发生了什么。

我有两个班级。一个是SendToQueue,另一个是RecvFromQueue。这是代码。

using System;
using System.IO;
using System.Windows.Forms;
using Microsoft.ServiceBus;
using Microsoft.ServiceBus.Messaging;
using Microsoft.WindowsAzure;
namespace ServiceBusQueueApp
{
    public class SendToQueue
    {
        private const string c_testqueue = "TestQueue";
        [STAThreadAttribute]
        static void Main(string[] args)
        {
            // Configure Queue Settings
            QueueDescription qd = new QueueDescription(c_testqueue)
            {
                MaxSizeInMegabytes = 5120,
                DefaultMessageTimeToLive = new TimeSpan(1, 1, 0)
            };
            // Create a new Queue with custom settings
            string connectionString =
                CloudConfigurationManager.GetSetting("Microsoft.ServiceBus.ConnectionString");
            var namespaceManager =
                NamespaceManager.CreateFromConnectionString(connectionString);
            if (!namespaceManager.QueueExists(c_testqueue))
            {
                namespaceManager.CreateQueue(qd);
            }
            namespaceManager.DeleteQueue(qd.Path);
            namespaceManager.CreateQueue(qd);
            QueueClient client = QueueClient.CreateFromConnectionString(connectionString, c_testqueue);
            double maxSize = Math.Pow(2, 18);
            OpenFileDialog openFile = new OpenFileDialog();
            while (true)
            {
                if (openFile.ShowDialog() == DialogResult.Cancel)
                {
                    break;
                }
                var messageBodyStream = new FileStream(openFile.FileName, System.IO.FileMode.Open, FileAccess.Read, FileShare.ReadWrite);
                if (messageBodyStream.Length > maxSize)
                {
                    MessageBox.Show("File is larger than 256KB.");
                    continue;
                }
                BrokeredMessage msg =
                    new BrokeredMessage(messageBodyStream);
                msg.Properties["MyProperty"] = "Test Value";

                try
                {
                    //send msg to the queue
                    client.Send(msg);
                }
                catch (Exception exception)
                {
                    MessageBox.Show(exception.Message);
                    throw;
                }
            }
        }
    }
}

using System;
using System.Diagnostics;
using System.IO;
using System.Threading;
using System.Windows.Forms;
using Microsoft.ServiceBus;
using Microsoft.ServiceBus.Messaging;
using Microsoft.WindowsAzure;
namespace ServiceBusQueueApp
{
    class RecvFromQueue
    {
        private const string c_testqueue = "TestQueue";
        static void Main(string[] args)
        {
            // Create a new Queue with custom settings
            string connectionString =
                CloudConfigurationManager.GetSetting("Microsoft.ServiceBus.ConnectionString");
            var namespaceManager =
                NamespaceManager.CreateFromConnectionString(connectionString);
            if (!namespaceManager.QueueExists(c_testqueue))
            {
                MessageBox.Show("Queue does not exist.");
                throw new Exception("Queue does not exist.");
            }
            QueueClient client = QueueClient.CreateFromConnectionString(connectionString, c_testqueue);
            while (true)
            {
                BrokeredMessage message = client.Receive();
                if (message == null)
                {
                    continue;
                }
                try
                {
                    Stream fstream = message.GetBody<Stream>();
                    byte[] buffer = new byte[fstream.Length];
                    fstream.Read(buffer, 0, (int)fstream.Length);
                    File.WriteAllBytes(@"C:'users'roberthar'pictures'testpic.png", buffer);
                    fstream.Close();
                    Process paint = new Process();
                    paint.StartInfo.FileName = @"C:'Windows'System32'mspaint.exe";
                    paint.StartInfo.Arguments = @"C:'users'roberthar'pictures'testpic.png";
                    paint.Start();
                    Thread.Sleep(3000);
                    paint.Close();
                    // Remove message from queue
                    message.Complete();
                }
                catch (Exception exception)
                {
                    // Indicate a problem, unlock message in queue
                    message.Abandon();
                }
            }
        }
    }
}

Azure 服务总线队列 - 当消息位于队列中时,QueueClient.Receive() 返回空代理消息

邮件大小限制为 256 KB,但这包括标头和正文,其中最大标头大小为 64 KB。在您的情况下,标头不是问题(小于 1 KB)

我运行您的示例,并进行一些小的更改:

            string filePath = Path.Combine(AppDomain.CurrentDomain.BaseDirectory,
                string.Format("testpic_{0}_{1}_{2}.png", now.Hour, now.Minute, now.Second));
            File.WriteAllBytes(filePath, buffer);
            fstream.Close();
            Process paint = new Process();
            paint.StartInfo.FileName = @"C:'Windows'System32'mspaint.exe";
            paint.StartInfo.Arguments = filePath;
            paint.Start();

我能够使用这个 254 KB 的图像成功发送和接收消息

如果你的消息太大,当你调用client.Send(msg);时,你会得到消息大小超过异常

您可以在队列上运行此测试,看看是否可以接收所有消息,它对我来说是通过。

    [Fact]
    public void MaxMessageSize()
    {
        var sender = CreateClient();
        var reciver = CreateClient();
        for (int i = 1; i < 255; i++)
        {
            var size = i*1024;
            var buffer = new byte[size];
            random.NextBytes(buffer);
             BrokeredMessage msg =
               new BrokeredMessage(buffer);
            msg.Properties["size"] = size;
            sender.Send(msg);
            var message  = reciver.Receive();
            Assert.NotNull(message);
            Assert.Equal(message.Properties["size"], size);
            var bufferReceived = message.GetBody<byte[]>();
            Assert.Equal(buffer, bufferReceived);
            message.Complete();
        }
    }

完整的要点在这里

我在帮助同事时偶然发现了这个问题。(我保证这是另一个开发人员!

当他运行检查队列的代码时,我们遇到了这个问题。MessageCount(将其设置为名为myQMessageCount的变量)和代码有"while (myQMessageCount> 0)",他在每次msg后重置队列计数。完成(在同一个 while 循环内)

结果。MessageCount 是队列中所有消息的"总和",包括 Active (您应该能够读取的消息)和死信等。

所以(1),修复是让他更改他的代码来检查ActiveMessageCount,而不是.消息计数

                Microsoft.ServiceBus.Messaging.QueueDescription qd = myMicrosoftdotServiceBusdotNamespaceManager.GetQueue(qName);
                string deadLetterQueueName = QueueClient.FormatDeadLetterPath(qd.Path);
                int activeMessageCount = qd.MessageCountDetails.ActiveMessageCount;
                int deadLetterMessageCount = qd.MessageCountDetails.DeadLetterMessageCount;
                int scheduledMessageCount = qd.MessageCountDetails.ScheduledMessageCount;
                int transferDeadLetterMessageCount = qd.MessageCountDetails.TransferDeadLetterMessageCount;
                int transferMessageCount = qd.MessageCountDetails.TransferMessageCount;

(2),在我们讨论之后,继续检查 ActiveMessageCount 可能并不明智,而只是让返回的空 BrokeredMessage 检查队列中没有更多消息。

无论如何。 我在这里发布这个答案,供未来的读者使用,他们可能会卡在他们正在编写的一些自定义读取队列代码上。