使用已定义的“检索”订阅单个项目.和“;timeout"
本文关键字:项目 单个 quot timeout 定义 检索 | 更新日期: 2023-09-27 17:50:05
我有一个IObservable<Packet>
,它是一个热可观察对象,它允许不同的订阅来分析传入的数据包。
我想写一个方法,它发送一些数据与ID,并等待相同ID的响应。伪代码:
void SendData(byte[] data, int retries, int timeout, Action<Packet> success, Action fail)
{
var sequenceId = GetSequenceId();
_port.SendData(data, sequenceId);
_packetStream.Where(p => p.SequenceId == sequenceId)
.Take(1)
.WaitForTimeout(timeout)
.WaitForRetry(retries)
.Subscribe(success) //Need to unsubscribe after packet is received
//If we didn't receive an answer packet, then call fail() action
}
我真的不知道,这些东西通常是如何用响应式扩展完成的。我很乐意收到一些建议。谢谢。
你的问题中的代码看起来很接近。Rx框架中存在两个"等待"方法(Timeout和Retry)。我建议您更改您的方法以返回IObservable
并删除success
和fail
参数。这样做"让你保持在单子中",并允许你在需要时将更多的操作符链接到可观察对象上。而success和fail参数是在订阅结果可观察对象时使用的(分别为OnNext和OnError)。
我假设数据应该在超时时重新发送(否则您并没有真正重试)。要做到这一点,您可以使用Observable.Create
在订阅时发送数据。
IObservable<Packet> SendData(byte[] data, int retries, TimeSpan timeout)
{
//only get the sequence id once per call to SendData, regardless of retries
var sequenceId = GetSequenceId();
return Observable.Create(obs =>
{ //this code runs every time you subscribe
_port.SendData(data, sequenceId);
return _packetStream.Where(p => p.SequenceId == sequenceId)
.Take(1)
.Timeout(timeout)
.Subscribe(obs)
})
.Retry(retries);
}
将Retry
操作符放在末尾会导致Create可观察对象超时时重试。顺便说一句,Timeout的重载允许你传入另一个可观察序列,以便在超时的情况下使用。如果需要,您可以将此重载与Observable.Throw
一起使用,以在超时的情况下提供自己的异常,例如提供替代错误消息。
请注意,这段代码在订阅之前不会发送数据,并且在返回结果或达到超时之前不会阻塞,但允许您通过处置订阅来取消进一步的重试。此代码也不会阻止您同时发送多个数据包。如果必须块,可以这样做:
var response = SendData(/* arguments */);
response.Do(success, fail).StartWith(null).ToTask().Wait();
如果你使用c# 5并且在async方法中调用这个,你可以等待这个可观察对象
目前我得到的是:
private void WaitForAnswer(byte sequenceId, int timeout, Action<Packet> success, Action fail, int retriesLeft)
{
_packetStream.Where(p => p.GetSequence() == sequenceId)
.Take(1)
.Timeout(TimeSpan.FromMilliseconds(timeout))
.Subscribe(success,
_ => {
if (retriesLeft > 0) WaitForAnswer(sequenceId, timeout, success, fail, --retriesLeft);
else fail();
},
() => { });
}
我不太确定,如果这个解决方案正确地处理订阅。