QueueClient.Receive()的异步方法

本文关键字:异步方法 Receive QueueClient | 更新日期: 2023-09-27 17:59:13

我正在使用服务总线连接web角色和工作者角色。我的工作者角色处于连续循环中,我正在接收web角色使用QueueClient.Receive()方法发送的消息。

但是使用这种方法,如果服务总线队列上没有消息,它会等待几秒钟来接收消息,而不是移动到下一行进行进一步执行。我希望有一些异步的方法来接收消息?或者至少有一些设置等待时间的方法?

我从QueueClient的msdn文档中找到了这个BeginReceive方法,我希望这能回答我的问题,但我不知道如何使用这个方法。方法参数是异步回调和对象状态,我不知道它们是什么。

有什么想法吗?

更新:多亏了Sandrino的出色解决方案,它可以异步工作。但是异步现在给我带来了一些问题。我的VS崩溃了。不确定问题出在哪里。下面是我使用的代码。

工作人员角色:

public override void Run()
    {
while (!IsStopped)
        {                
                // Receive the message from Web Role to upload the broadcast to queue
                BroadcastClient.BeginReceive(OnWebRoleMessageReceived, null);                    
                // Receive the message from SignalR BroadcastHub
                SignalRClient.BeginReceive(OnSignalRMessageReceived, null);                    
            }
}

public void OnWebRoleMessageReceived(IAsyncResult iar)
    {
        BrokeredMessage receivedBroadcastMessage = null;
        receivedBroadcastMessage = BroadcastClient.EndReceive(iar);
        if (receivedBroadcastMessage != null)
        {   //process message
           receivedBroadcastMessage.Complete();
        }
    }
public void OnSignalRMessageReceived(IAsyncResult iar)
    {
        BrokeredMessage receivedSignalRMessage = null;
        receivedSignalRMessage = SignalRClient.EndReceive(iar);
        if (receivedSignalRMessage != null)
        {
            //process message
           receivedSignalRMessage.Complete();
           WorkerRoleClient.Send(signalRMessage);
        }
     }

我是不是错过了什么让VS过度工作和崩溃的东西?因为在切换到BeginReceive之前,当iw使用QueueClient.Receive时,它运行良好,没有崩溃。

感谢

QueueClient.Receive()的异步方法

BeginReceive方法是您的案例中的方法。你通常会这样称呼它:

void SomeMethod() 
{
     ...
     client.BeginReceive(TimeSpan.FromMinutes(5), OnMessageReceived, null);
     ...
}
void OnMessageReceived(IAsyncResult iar)
{
     var msg = client.EndReceive(iar);
     if (msg != null)
     {
         var body = msg.GetBody<MyMessageType>();
         ...
     }
}

我就是这样做的(扩展Sandrino De Mattia的解决方案):

void SomeMethod() 
{
    ...
    client.BeginReceive(TimeSpan.FromSeconds(5), OnMessageReceived, null);
    ...
}
void OnMessageReceived(IAsyncResult iar)
{
    if(!IsStopped)
    {
        var msg = client.EndReceive(iar);
        if (msg != null)
        {
            var body = msg.GetBody<MyMessageType>();
            ... //Do something interesting with the message
            //Remove the message from the queue
            msg.Complete();
            client.BeginReceive(TimeSpan.FromSeconds(5), OnMessageReceived, null);
        }
    }
}

这样,我就有了一个带有停止机制的"无尽循环"。

最新版本的Azure ServiceBus SDK(下载链接)提供了异步接收消息的完全支持:

async Task TestMethod()
{
    string connectionString = CloudConfigurationManager.GetSetting("Microsoft.ServiceBus.ConnectionString");
    QueueClient Client = QueueClient.CreateFromConnectionString(connectionString, "TestQueue");
    var message = await Client.ReceiveAsync();
}