是否调用Azure ServiceBus OnMessage阻塞
本文关键字:OnMessage 阻塞 ServiceBus Azure 调用 是否 | 更新日期: 2023-09-27 18:01:57
我们正在使用Azure ServiceBus队列来处理大量的客户端请求。然而,OnMessage
调用似乎是一个阻塞调用,然而,如果它确实是一个阻塞调用,那么这个调用的阻塞是令人难以置信的不一致。
我试图完成的是从web服务应用程序永久地监视队列(允许从运行的应用程序中挖掘指标)
我正在创建下面的订阅:
protected virtual void Subscribe(string queueName, Func<QueueRequest, bool> callback)
{
var client = GetClient(queueName, PollingTimeout);
var transformCallback = new Action<BrokeredMessage>((message) =>
{
try
{
var request = message.ToQueueRequest();
if (callback(request))
{
message.Complete();
}
else
{
message.Abandon();
}
}
catch (Exception ex)
{
//TODO: Log the error
message.Abandon();
}
});
var options = new OnMessageOptions
{
MaxConcurrentCalls = _config.GetInt("MaxThreadsPerQueue"),
AutoComplete = false
};
options.ExceptionReceived += OnMessageError;
client.OnMessage(transformCallback, options);
}
如果只调用一次订阅,则应用程序将停止监视队列,从而停止处理消息。但是,如果我在订阅调用周围放置一个while循环。因此,我非常犹豫地写了下面的代码片段,以便在此OnMessage
完成后重新订阅。
protected void MonitorQueue()
{
IsRunning = true;
while (IsRunning)
{
try
{
Log.Info("MonitoringThread: OnMessage beginning logging for {0}", QueueName);
QueueClient.Subscribe(QueueName, Processor);
Log.Info("MonitoringThread: OnMessage ended logging for {0}", QueueName);
}
catch (Exception ex)
{
IsRunning = false;
Log.Error("MonitoringThread: Error in subscription for {0}: ", ex, QueueName);
}
if (SleepBeforeReinit > 0 && IsRunning)
{
Thread.Sleep(SleepBeforeReinit);
}
}
}
这修复了由于未被拾取而导致消息过期为死信的问题,但是这会引起其他问题。
OnMessage是一个可计费的操作,当我看到日志文件告诉我队列的开始和结束间隔不到一秒,并且我的日志文件的大小增加非常迅速时,我很担心。
我将MessagingFactory
设置为1天的OperationTimeout
,但这似乎并没有像我期望的那样影响订阅打开/关闭状态的频率。
我见过很多这样做的例子作为工人角色,但是这不会完成我们想要做的事情。我目前从我们的web应用程序的Global.asax.cs布线。任何建议都非常感谢!
OnMessage和OnMessageAsync不阻塞调用。这些需要实例化一次,并将继续订阅队列,直到应用程序终止。
请参阅相关文章了解更多详细信息:Azure服务总线,确定OnMessage是否停止处理
您的第一种方法是正确的,并且MaxThreadsPerQueue的值决定了有多少线程可用来处理您的消息。听起来好像这些线程已经用完了(也许回调对消息的处理花费了一些时间?)。
你应该考虑使用QueueClient。OnMessageAsync方法接收你的消息