使用已定义的“检索”订阅单个项目.和“;timeout"

本文关键字:项目 单个 quot timeout 定义 检索 | 更新日期: 2023-09-27 17:50:05

我有一个IObservable<Packet>,它是一个热可观察对象,它允许不同的订阅来分析传入的数据包。

我想写一个方法,它发送一些数据与ID,并等待相同ID的响应。伪代码:

void SendData(byte[] data, int retries, int timeout, Action<Packet> success, Action fail) 
{
    var sequenceId = GetSequenceId();
    _port.SendData(data, sequenceId);
    _packetStream.Where(p => p.SequenceId == sequenceId)
                 .Take(1)
                 .WaitForTimeout(timeout)
                 .WaitForRetry(retries)
                 .Subscribe(success) //Need to unsubscribe after packet is received
    //If we didn't receive an answer packet, then call fail() action 
}

我真的不知道,这些东西通常是如何用响应式扩展完成的。我很乐意收到一些建议。谢谢。

使用已定义的“检索”订阅单个项目.和“;timeout"

你的问题中的代码看起来很接近。Rx框架中存在两个"等待"方法(Timeout和Retry)。我建议您更改您的方法以返回IObservable并删除successfail参数。这样做"让你保持在单子中",并允许你在需要时将更多的操作符链接到可观察对象上。而success和fail参数是在订阅结果可观察对象时使用的(分别为OnNext和OnError)。

我假设数据应该在超时时重新发送(否则您并没有真正重试)。要做到这一点,您可以使用Observable.Create在订阅时发送数据。

IObservable<Packet> SendData(byte[] data, int retries, TimeSpan timeout)
{
    //only get the sequence id once per call to SendData, regardless of retries
    var sequenceId = GetSequenceId();
    return Observable.Create(obs =>
        {   //this code runs every time you subscribe
            _port.SendData(data, sequenceId);
            return _packetStream.Where(p => p.SequenceId == sequenceId)
                                .Take(1)
                                .Timeout(timeout)
                                .Subscribe(obs)
        })
        .Retry(retries); 
}

Retry操作符放在末尾会导致Create可观察对象超时时重试。顺便说一句,Timeout的重载允许你传入另一个可观察序列,以便在超时的情况下使用。如果需要,您可以将此重载与Observable.Throw一起使用,以在超时的情况下提供自己的异常,例如提供替代错误消息。

请注意,这段代码在订阅之前不会发送数据,并且在返回结果或达到超时之前不会阻塞,但允许您通过处置订阅来取消进一步的重试。此代码也不会阻止您同时发送多个数据包。如果必须块,可以这样做:

var response = SendData(/* arguments */);
response.Do(success, fail).StartWith(null).ToTask().Wait();

如果你使用c# 5并且在async方法中调用这个,你可以等待这个可观察对象

目前我得到的是:

private void WaitForAnswer(byte sequenceId, int timeout, Action<Packet> success, Action fail, int retriesLeft)
{
    _packetStream.Where(p => p.GetSequence() == sequenceId)
                 .Take(1)
                 .Timeout(TimeSpan.FromMilliseconds(timeout))
                 .Subscribe(success, 
                            _ => {
                                if (retriesLeft > 0) WaitForAnswer(sequenceId, timeout, success, fail, --retriesLeft);
                                else fail();
                            }, 
                            () => { });
}

我不太确定,如果这个解决方案正确地处理订阅。