是否调用Azure ServiceBus OnMessage阻塞

本文关键字:OnMessage 阻塞 ServiceBus Azure 调用 是否 | 更新日期: 2023-09-27 18:01:57

我们正在使用Azure ServiceBus队列来处理大量的客户端请求。然而,OnMessage调用似乎是一个阻塞调用,然而,如果它确实是一个阻塞调用,那么这个调用的阻塞是令人难以置信的不一致。

我试图完成的是从web服务应用程序永久地监视队列(允许从运行的应用程序中挖掘指标)

我正在创建下面的订阅:

protected virtual void Subscribe(string queueName, Func<QueueRequest, bool> callback)
{
    var client = GetClient(queueName, PollingTimeout);
    var transformCallback = new Action<BrokeredMessage>((message) =>
    {
        try
        {
            var request = message.ToQueueRequest();
            if (callback(request))
            {
                message.Complete();
            }
            else
            {
                message.Abandon();
            }
        }
        catch (Exception ex)
        {
            //TODO: Log the error
            message.Abandon();
        }
    });
    var options = new OnMessageOptions
    {
        MaxConcurrentCalls = _config.GetInt("MaxThreadsPerQueue"),
        AutoComplete = false
    };
    options.ExceptionReceived += OnMessageError;
    client.OnMessage(transformCallback, options);
}

如果只调用一次订阅,则应用程序将停止监视队列,从而停止处理消息。但是,如果我在订阅调用周围放置一个while循环。因此,我非常犹豫地写了下面的代码片段,以便在此OnMessage完成后重新订阅。

protected void MonitorQueue()
{
    IsRunning = true;
    while (IsRunning)
    {
        try
        {
            Log.Info("MonitoringThread: OnMessage beginning logging for {0}", QueueName);
            QueueClient.Subscribe(QueueName, Processor);
            Log.Info("MonitoringThread:  OnMessage ended logging for {0}", QueueName);
        }
        catch (Exception ex)
        {
            IsRunning = false;
            Log.Error("MonitoringThread: Error in subscription for {0}: ", ex, QueueName);
        }
        if (SleepBeforeReinit > 0 && IsRunning)
        {
            Thread.Sleep(SleepBeforeReinit);
        }
    }
}

这修复了由于未被拾取而导致消息过期为死信的问题,但是这会引起其他问题。

OnMessage是一个可计费的操作,当我看到日志文件告诉我队列的开始和结束间隔不到一秒,并且我的日志文件的大小增加非常迅速时,我很担心。

我将MessagingFactory设置为1天的OperationTimeout,但这似乎并没有像我期望的那样影响订阅打开/关闭状态的频率。

我见过很多这样做的例子作为工人角色,但是这不会完成我们想要做的事情。我目前从我们的web应用程序的Global.asax.cs布线。任何建议都非常感谢!

是否调用Azure ServiceBus OnMessage阻塞

OnMessage和OnMessageAsync不阻塞调用。这些需要实例化一次,并将继续订阅队列,直到应用程序终止。

请参阅相关文章了解更多详细信息:Azure服务总线,确定OnMessage是否停止处理

您的第一种方法是正确的,并且MaxThreadsPerQueue的值决定了有多少线程可用来处理您的消息。听起来好像这些线程已经用完了(也许回调对消息的处理花费了一些时间?)。

你应该考虑使用QueueClient。OnMessageAsync方法接收你的消息