Azure服务总线ReceiveBatch()的奇怪行为
本文关键字:服务 总线 ReceiveBatch Azure | 更新日期: 2023-09-27 18:07:25
目前正在使用Azure服务总线主题,并且遇到使用ReceiveBatch方法接收我的消息的问题。问题是,预期的结果实际上并不是我得到的结果。下面是基本的代码设置,用例如下:
SubscriptionClient client = SubscriptionClient.CreateFromConnectionString(connectionString, convoTopic, subName);
IEnumerable<BrokeredMessage> messageList = client.ReceiveBatch(100);
foreach (BrokeredMessage message in messageList)
{
try
{
Console.WriteLine(message.GetBody<string>() + message.MessageId);
message.Complete();
}
catch (Exception ex)
{
message.Abandon();
}
}
client.Close();
MessageBox.Show("Done");
使用上面的代码,如果我发送4条消息,那么在第一次运行时进行轮询,我得到第一条消息。在第二次循环中,我得到了另外3个。我希望同时得到所有4个。它似乎总是在第一次投票时返回一个奇异值,然后在随后的投票中返回其余的值。(与3和5相同的结果,我在第二次尝试中获得n-1个消息,在第一次尝试中获得1个消息)。
如果我有0条消息要接收,则该操作需要~30-60秒才能获得messageList(计数为0)。
如果我将代码更改为
IEnumerable<BrokeredMessage> messageList = client.ReceiveBatch(100, new Timespan(0,0,0));
,那么问题#2就会消失,因为问题1仍然存在,我必须调用代码两次才能获得所有消息。
我假设问题#2是由于我在#3中覆盖的默认超时值(尽管我发现如果消息存在,它会立即响应而无需等待默认时间,这令人困惑)。我不知道为什么我从来没有在单个ReceiveBatch中收到全部的消息。
我让ReceiveBatch()正常工作的方式是做两件事。
- 在主题中禁用分区(我不得不为此创建一个新主题,因为你不能在创建后切换)
- 在每个订阅上启用批处理,如下所示: 列表项
SubscriptionDescription sd = new SubscriptionDescription(topicName, orgSubName);
sd.EnableBatchedOperations = true;
在我做了这两件事之后,我能够使用IEnumerable<BrokeredMessage> messageList = client.ReceiveBatch(100, new TimeSpan(0,0,0));
我在ASB队列中遇到了类似的问题。我发现我可以通过在接收批处理之前增加客户机上的PrefetchCount
来减轻它:
SubscriptionClient client = SubscriptionClient.CreateFromConnectionString(connectionString, convoTopic, subName);
client.PrefetchCount = 100;
IEnumerable<BrokeredMessage> messageList = client.ReceiveBatch(100);
来自使用服务总线代理消息传递提高性能的Azure服务总线最佳实践:
预取允许队列或订阅客户端在执行接收操作时从服务加载额外的消息。
…
当使用默认的锁过期时间为60秒时,一个较好的值为
SubscriptionClient.PrefetchCount
是工厂所有接收器最大处理率的20倍。例如,工厂创建3个接收器,每个接收器每秒最多可以处理10条消息。预取次数不能超过20*3*10 = 600。…
预取消息增加了队列或订阅的总体吞吐量,因为它减少了消息操作或往返的总数。然而,获取第一条消息将花费更长的时间(由于消息大小的增加)。接收预取的消息会更快,因为这些消息已经被客户端下载了。
只是拼图的几个部分。即使在启用批处理和禁用分区之后,我仍然无法让它工作-我仍然必须做两个ReceiveBatch调用。然而,我确实发现:
- 重新启动服务总线服务(我使用Windows Server的服务总线)为我解决了这个问题。
- 执行单个ReceiveBatch并且不采取任何操作(让消息锁过期),然后执行另一个ReceiveBatch导致所有消息同时通过。(对所有消息进行初始化ReceiveBatch并调用放弃不会导致该行为。)
所以它似乎是某种损坏/错误在服务总线的内存缓存