.NET客户端-在处理下一个请求之前等待MQTT响应

本文关键字:等待 MQTT 响应 请求 下一个 客户端 处理 NET | 更新日期: 2023-09-27 17:58:51

我在循环中有一个MQTT调用,在每次迭代中,它都应该从订阅者返回一个响应,这样我就可以使用发布后转发的值。但问题是我不知道该怎么做

我希望你有一个想法,或者如果我只是没有正确实施,请你指导我度过难关。谢谢

这是我的代码:

// MyClientMgr
class MyClientMgr{
  public long CurrentOutput { get; set; }
  public void GetCurrentOutput(MyObjectParameters parameters, MqttClient client)
  {
      MyMessageObject msg = new MyMessageObject
      {
        Action = MyEnum.GetOutput,
        Data = JsonConvert.SerializeObject(parameters)
      }
      mq_GetCurrentOutput(msg, client);
  }
  private void mq_GetCurrentOutput(MyMessageObject msg, MqttClient client)
  {
      string msgStr = JsonConvert.SerializeObject(msg);
      client.Publish("getOutput", Encoding.UTF8.GetBytes(msgStr),   
MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE, false);
      client.MqttMsgPublishReceived += (sender, e) =>{
           MyObjectOutput output = JsonConvert.DeserializeObject<MyObjectOutput>(Encoding.UTF8.GetString(e.Message));
           CurrentOutput = output;
      };
  }  
}
// MyServerMgr
class MyServerMgr
{
   public void InitSubscriptions()
   {
      mq_GetOutput();
   }
   private void mq_GetOutput()
   {
       MqttClient clientSubscribe = new MqttClient(host);
       string clientId = Guid.NewGuid().ToString();
       clientSubscribe.Connect(clientId);
       clientSubscribe.Subscribe(new string[] { "getOutput" }, new byte[] { MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE });
       MqttClient clientPublish = new MqttClient(host);
       string clientIdPub = Guid.NewGuid().ToString();
       clientPublish.Connect(clientIdPub);
       clientSubscribe.MqttMsgPublishReceived += (sender, e) => {
            MyMessageObj msg = JsonConvert.DeserializeObject<MyMessageObj>(Encoding.UTF8.GetString(e.Message));
            var output = msg.Output;
            clientPublish.Publish("getOutput", Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(output)), MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE, false);
       }
   }
}
// MyCallerClass
class MyCallerClass
{
   var host = "test.mqtt.org";
   var myServer = new MyServerMgr(host);
   var myClient = new MyClientMgr();
   myServer.InitSubscriptions();
   MqttClient client = new MqttClient(host);
   for(int i = 0; i < 10; i++)
   {
      long output = 0;
      MyObjectParameters parameters = {};
      myClient.GetCurrentOutput(parameters, client) // here I call the  method from my client manager 
      // to publish the getting of the output and assigned 
      // below for use,  but the problem is the value doesn't 
      // being passed to the output variable because it is not 
      // yet returned by the server.
      // Is there a way I could wait the process to 
      // get the response before assigning the output?
      output = myClient.CurrentOutput; // output here will always be null  
     // because the response is not yet forwarded by the server 
   }
}

我的调用者类中有一个循环,用于调用mqtt发布以获取输出,但我不知道如何在分配输出之前获取输出,我想先等待响应,然后再进行下一个。

我已经试过这样在里面循环一段时间了:

while(output == 0)
{
   output = myClient.CurrentOutput;
}

是的,我可以在这里得到输出,但这会大大减慢过程。有时它会失败。

请帮帮我。谢谢。

.NET客户端-在处理下一个请求之前等待MQTT响应

看起来您正在尝试通过异步协议(MQTT)进行同步通信。

我的意思是,您希望发送一条消息,然后等待响应,这不是MQTT的工作方式,因为在协议级别没有对消息的回复的概念。

我对C#不太熟悉,所以我只对可能的解决方案进行抽象描述。

我的建议是使用发布线程wait/pulse(查看Monitor类)在每次发布后都有这个块,并在收到响应时让消息处理程序调用pulse。

如果响应不包含识别原始请求的等待,则还需要一个状态机变量来记录正在进行的请求。

你可能想考虑暂停等待,以防对方因某些原因没有回应。

您可以使用具有WaitOne()和Set()方法的AutoResetEvent类。发布后使用WaitOne()将等待消息发布,在client_MqttMsgPublishReceived事件下使用Set()将在订阅者收到他订阅的消息时释放等待。