以异步方式等待服务器同时对不同调用的响应
本文关键字:调用 响应 方式 等待 服务器 异步 | 更新日期: 2023-09-27 18:02:58
我正在为TCP上的模型铁路控制器编写客户端库。服务器嵌入在控制单元中。
当客户端向服务器发送命令时,例如set(5, addr[3])
,服务器响应一个响应头<REPLY set(5, addr[3])>
,该命令的结果以及是否有错误。
因为所有这些东西都是异步的,所以我必须将回复与命令相匹配。(即使客户端只发送一个命令,然后等待响应,也有服务器端事件)
为了有一个好的和易于理解的接口到这个库,我使用async await
模式。这意味着客户端代码发出一个类似await client.Set(5, "addr", "3")
的调用,代码在服务器发送回响应后继续,并且响应由客户端代码计算。
我目前正在用IDictionary<string, EventWaitHandle>
实现这一点,其中字符串是命令,EventWaitHandle在SendeBefehlAwaitResonse(string befehl)
方法中等待await Task.Run(() => signal = e.WaitOne(timeout));
有更常见的方法吗?对于NetworkClient,我还首先使用EventWaitHandle来等待新消息发送(并使用我的MessageDelay)属性。我发现使用调用await Task.Delay(100);
的无限循环有更好的性能。
问题:
- 有没有更好的方法来等待服务器的响应?也许用响应式扩展或其他库?
如果我必须重写部分库,这对我来说不是什么大问题。我写这个库主要是为了学习。虽然代码(主要是TCP客户端)在某种程度上是被黑客攻击的代码,但我尽我最大的努力为项目提供一个更好、更容易理解的结构。
提前感谢您的帮助!
您可以在这里找到代码:https://github.com/schjan/RailNet |消息调度程序类
Rx可能会让您的生活轻松很多。我还认为,这将减少代码中一些潜在的竞争条件,并最终减少管道风格的代码(即维护缓存的代码)。
如果我从我认为是代码的关键元素(来自https://github.com/schjan/RailNet/blob/master/src/RailNet.Clients.Ecos/Basic/NachrichtenDispo.cs)开始,并将它们拉出到方法中,我看到了这个
private bool HasBeginAndEnd(string[] message)
{
bool isValid = true;
if (!message[0].StartsWith("<") || !message[0].EndsWith(">"))
isValid = false;
if (!message.Last().StartsWith("<END"))
isValid = false;
return isValid;
}
private bool IsReplyMessage(string[] message)
{
return message.Length>0 && message[0].StartsWith("<REPLY ");
}
private BasicAntwort ParseResponse(string[] message)
{
string header = message[0].Substring(7, message[0].Length - 8);
return new BasicAntwort(message, header);
}
使用这些漂亮的小描述性方法,我可以使用Rx创建一个可观察的响应序列。
var incomingMessages = Observable.FromEventPattern<MessageReceivedEventArgs>(
h => _networkClient.MessageReceivedEvent += h,
h => _networkClient.MessageReceivedEvent -= h)
.Select(x => x.EventArgs.Content)
.Where(HasBeginAndEnd)
.Where(IsReplyMessage)
.Select(ParseResponse);
很好,现在我们有一个传入的流/序列。
接下来,我们希望能够发出命令并为其返回适当的响应。Rx也可以这样做。
incomingMessages.Where(reply=>reply.Header == befehl)
为了继续,我们还想添加一个超时,这样如果我们在给定的时间(8000ms?)内没有从out命令得到响应,我们应该抛出。我们可以将其转换为任务,如果我们只想要单个值,那么我们也可以对Rx这样做。
incomingMessages.Where(reply=>reply.Header == befehl)
.Timeout(TimeSpan.FromSeconds(2))
.Take(1)
.ToTask();
快完成了。我们只需要一种发送命令并返回带有响应(或超时)的任务的方法。没有问题。只需先订阅我们的传入消息序列,以避免竞争条件,然后发出命令。
public Task<BasicAntwort> SendCommand(NetworkClient networkClient, string befehl)
{
//Subscribe first to avoid race condition.
var result = incomingMessages
.Where(reply=>reply.Header == befehl)
.Timeout(TimeSpan.FromSeconds(2))
.Take(1)
.ToTask();
//Send command
networkClient.SendMessage(befehl);
return result;
}
下面是LinqPad脚本的完整代码
void Main()
{
var _networkClient = new NetworkClient();
var sendCommandTask = SendCommand(_networkClient, "MyCommand");
BasicAntwort reply = sendCommandTask.Result;
reply.Dump();
}
private static bool HasBeginAndEnd(string[] message)
{
bool isValid = true;
if (!message[0].StartsWith("<") || !message[0].EndsWith(">"))
isValid = false;
if (!message.Last().StartsWith("<END"))
isValid = false;
return isValid;
}
private static bool IsReplyMessage(string[] message)
{
return message.Length>0 && message[0].StartsWith("<REPLY ");
}
private static BasicAntwort ParseResponse(string[] message)
{
string header = message[0].Substring(7, message[0].Length - 8);
return new BasicAntwort(message, header);
}
public IObservable<BasicAntwort> Responses(NetworkClient networkClient)
{
return Observable.FromEventPattern<MessageReceivedEventArgs>(
h => networkClient.MessageReceivedEvent += h,
h => networkClient.MessageReceivedEvent -= h)
.Select(x => x.EventArgs.Content)
.Where(HasBeginAndEnd)
.Where(IsReplyMessage)
.Select(ParseResponse);
}
public Task<BasicAntwort> SendCommand(NetworkClient networkClient, string befehl)
{
//Subscribe first to avoid race condition.
var result = Responses(networkClient)
.Where(reply=>reply.Header == befehl)
.Timeout(TimeSpan.FromSeconds(2))
.Take(1)
.ToTask();
//Send command
networkClient.SendMessage(befehl);
return result;
}
public class NetworkClient
{
public event EventHandler<MessageReceivedEventArgs> MessageReceivedEvent;
public bool Connected { get; set; }
public void SendMessage(string befehl)
{
var handle = MessageReceivedEvent;
if(handle!=null){
var message = new string[3]{"<REPLY " + befehl +">", "Some content", "<END>"};
handle(this, new UserQuery.MessageReceivedEventArgs(){Content=message});
}
}
}
public class MessageReceivedEventArgs : EventArgs
{
public string[] Content { get; set; }
}
public class BasicAntwort
{
public BasicAntwort(string[] message, string header)
{
Header = header;
Message = message;
}
public string Header { get; set; }
public string[] Message { get; set; }
}