服务总线如何在服务器重启后重新连接(非azure)
本文关键字:重新连接 azure 重启 总线 服务器 服务 | 更新日期: 2023-09-27 18:13:43
我使用的是服务总线1.1 on Premise,我正在构建一个WindowsService,它应该只是监听一个特定的队列,在另一台机器上运行。
连接到SB的WindowsService代码如下:
MessagingFactory _receiverFactory = MessagingFactory.CreateFromConnectionString(sbConnectionString);
MessageReceiver _messageReceiver = _receiverFactory.CreateMessageReceiver(listeningQueue);
BrokeredMessage brokeredMessage = _messageReceiver.Receive(TimeSpan.FromSeconds(60));
一切工作正常,直到我重新启动服务器:我注意到,如果WindowsService在所有服务总线服务可用之前启动,我的应用程序将永远不会连接,直到我手动重启我的WindowsService。
实际上,如果在我构建MessagingFactory时SB没有完全运行,则接收指令将始终失败,即使SB服务在第二次返回…
我试图拦截异常(MessagingCommunicationException)并重建MessagingFactory,但根本不起作用,重新连接的唯一方法是重新启动WindowsService。
有什么建议吗?
- 设置高可用性("HA")服务场
但即使你做或不做#1"HA",你也应该做#2。
使用"Polly"捕获并自动重试特定的异常
using System; using System.Collections.Generic; using System.Linq; using System.Threading; using Microsoft.ServiceBus; using Microsoft.ServiceBus.Messaging; using Polly; private static void KeepReadingTheQueue(QueueClient qc) { int retries = 0; int eventualFailures = 0; Policy pol = Policy.Handle<System.OperationCanceledException>(ex => null != ex) .Or<UnauthorizedAccessException>(ex => null != ex) .Or<Microsoft.ServiceBus.Messaging.MessagingCommunicationException>() .Or<Microsoft.ServiceBus.Messaging.MessageLockLostException>() .WaitAndRetryForever( sleepDurationProvider: attempt => TimeSpan.FromMilliseconds(500), // Wait 500ms between each try. onRetry: (exception, calculatedWaitDuration, context) => // Capture some info for logging! { ReportDuplicatesOrMissing("Policy.OnRetry", batchNumbers); // This is your new exception handler! // Tell the user what they've won! Console.WriteLine("Log, then retry: " + exception.Message, ConsoleColor.Yellow); retries++; }); int i = 0; int receiveBatchSize = 1; /* i have this in a custom settings class, hard coded here */ while (true) { i++; ReportDuplicatesOrMissing("while.true", batchNumbers); try { // Retry the following call according to the policy - 15 times. pol.Execute(() => { IEnumerable<BrokeredMessage> messages = null; if (receiveBatchSize <= 1) { // Receive the message from the queue BrokeredMessage receivedMessage = qc.Receive(); List<BrokeredMessage> listMsgs = new List<BrokeredMessage>(); listMsgs.Add(receivedMessage); messages = listMsgs as IEnumerable<BrokeredMessage>; } else { messages = qc.ReceiveBatch(receiveBatchSize); int count = messages.Count(); if (count > 0) { Console.WriteLine("ReceiveBatch.Count='{0}'", count); } } foreach (BrokeredMessage receivedMessage in messages) { if (receivedMessage != null) { MyPayLoadObject concreteMsgObject = receivedMessage.GetBody<MyPayLoadObject>(); receivedMessage.Complete(); Console.WriteLine(@"Message received: {0}", concreteMsgObject); } else { Console.WriteLine(@"No new messages in the queue"); Thread.Sleep(1000); } Thread.Sleep(100); } }); } catch (Exception e) { Console.WriteLine("Request " + i + " eventually failed with: " + e.Message, ConsoleColor.Red); eventualFailures++; } } }
packages.config
<?xml version="1.0" encoding="utf-8"?>
<packages>
<package id="Microsoft.WindowsAzure.ConfigurationManager" version="2.0.1.0" targetFramework="net45" />
<package id="Polly" version="4.3.0" targetFramework="net45" />
<package id="WindowsAzure.ServiceBus" version="2.1.4.0" targetFramework="net45" />
</packages>