如何耗尽 Azure MessageReceiver 消息泵

本文关键字:消息 MessageReceiver Azure 何耗尽 | 更新日期: 2023-09-27 18:36:55

我有一个MessageReceiver从队列中抽取消息:

var factory = MessagingFactory.CreateFromConnectionString(connectionString);
var receiver = factory.CreateMessageReceiver(queuePath);
receiver.OnMessageAsync(HandleBrokeredMessageAsync);

HandleBrokeredMessageAsync是接收者将消息泵入的我的委托。

当我在接收方上调用Close()时,它将停止从队列中抽取更多消息。为了避免潜在的争用条件,我希望在返回控制权之前确保所有挂起的处理都已完成。

我考虑过跟踪每个呼叫以HandleBrokeredMessageAsync ConcurrentBag<T>,完成后将它们从袋子中取出。我会使用 BlockingCollection<T> 来阻止进程,直到排空完成,但不清楚何时调用CompleteAdding():我会在调用Close()后调用它,但是调用Close()和随后传递给处理程序的消息之间是否有间隙?

receiver.Close();
pendingMessages.CompleteAdding(); 
// Can additional messages be pumped after this?

如何耗尽 Azure MessageReceiver 消息泵

请看一下此处的示例,因为它使用 ManualResetEvent 来协调 Run() 方法的关闭和 Close 操作。类似的方法可能适用于在消息处理中检查并在那里停止(不接受下一条消息进行处理),然后在所有这些并发处理器完成后调用 close?

https://stackoverflow.com/a/16739691/1078351