Azure服务总线-使用OnMessage()方法接收消息
本文关键字:方法 消息 OnMessage 服务 总线 使用 Azure | 更新日期: 2023-09-27 18:17:48
按照MS文档,接收来自订阅的消息并不困难。但是,如果我想让我的应用程序在每次发布新消息时都接收到一条消息,则需要一个恒定的轮询。因此SubscriptionClient类的OnMessage()方法。
MS文档说: "…当调用OnMessage时,客户端启动一个内部消息泵,不断地轮询队列或订阅。此消息泵由发出Receive()调用的无限循环组成。如果调用超时,它发出下一个Receive()调用. ..."
但是当应用程序运行时,调用moment OnMessage()方法时,只接收到最新消息。当发布新消息时,恒定轮询似乎不起作用。在尝试了许多不同的方法之后,我可以使此工作并使应用程序在接收到新消息的那一刻做出反应的唯一方法是将代码放入具有无限循环的单独任务中。这在很多层面上都是完全错误的!(见下面的代码)。
谁能帮助我纠正我的代码或张贴工作样本,以完成相同的功能没有循环?谢谢你!
public void ReceiveMessageFromSubscription(string topicName, string subscriptionFilter)
{
var newMessage = new MessageQueue();
int i = 0;
Task listener = Task.Factory.StartNew(() =>
{
while (true)
{
SubscriptionClient Client = SubscriptionClient.CreateFromConnectionString(connectionString, topicName, subscriptionFilter);
Dictionary<string, string> retrievedMessage = new Dictionary<string, string>();
OnMessageOptions options = new OnMessageOptions();
options.AutoComplete = false;
options.AutoRenewTimeout = TimeSpan.FromMinutes(1);
Client.OnMessage((message) =>
{
try
{
retrievedMessage.Add("messageGuid", message.Properties["MessageGuid"].ToString());
retrievedMessage.Add("instanceId", message.Properties["InstanceId"].ToString());
retrievedMessage.Add("pId", message.Properties["ProcessId"].ToString());
retrievedMessage.Add("processKey", message.Properties["ProcessKey"].ToString());
retrievedMessage.Add("message", message.Properties["Message"].ToString());
newMessage.AnnounceNewMessage(retrievedMessage); // event ->
message.Complete(); // Remove message from subscription.
}
catch (Exception ex)
{
string exmes = ex.Message;
message.Abandon();
}
}, options);
retrievedMessage.Clear();
i++;
Thread.Sleep(3000);
}
});
}
您的代码有几个问题需要解决-
- 它失败了,我假设你的应用程序然后退出-或在至少侦听消息的线程终止了。
- 你的while循环不断重复代码来连接消息处理程序,你只需要做一次。
- 你需要一种方法来保持调用堆栈存活,并防止你的应用程序垃圾收集你的对象。
下面的内容可以帮助你走向成功。祝你好运。
ManualResetEvent CompletedResetEvent = new ManualResetEvent(false);
SubscriptionClient Client;
public void ReceiveMessagesFromSubscription(string topicName, string subscriptionFilter, string connectionString)
{
Task listener = Task.Factory.StartNew(() =>
{
// You only need to set up the below once.
Client = SubscriptionClient.CreateFromConnectionString(connectionString, topicName, subscriptionFilter);
OnMessageOptions options = new OnMessageOptions();
options.AutoComplete = false;
options.AutoRenewTimeout = TimeSpan.FromMinutes(1);
options.ExceptionReceived += LogErrors;
Client.OnMessage((message) =>
{
try
{
Trace.WriteLine("Got the message with ID {0}", message.MessageId);
message.Complete(); // Remove message from subscription.
}
catch (Exception ex)
{
Trace.WriteLine("Exception occurred receiving a message: {0}" + ex.ToString());
message.Abandon(); // Failed. Leave the message for retry or max deliveries is exceeded and it dead letters.
}
}, options);
CompletedResetEvent.WaitOne();
});
}
/// <summary>
/// Added in rudimentary exception handling .
/// </summary>
/// <param name="sender">The sender.</param>
/// <param name="ex">The <see cref="ExceptionReceivedEventArgs"/> instance containing the event data.</param>
private void LogErrors(object sender, ExceptionReceivedEventArgs ex)
{
Trace.WriteLine("Exception occurred in OnMessage: {0}" + ex.ToString());
}
/// <summary>
/// Call this to stop the messages arriving from subscription.
/// </summary>
public void StopMessagesFromSubscription()
{
Client.Close(); // Close the message pump down gracefully
CompletedResetEvent.Set(); // Let the execution of the listener complete and terminate gracefully
}
或者,您可以使用ReceiveBatch:
以更传统的方式将消息分组。var messages = await queueClient.ReceiveBatchAsync(10, TimeSpan.FromSeconds(30),
cancellationToken);