将IEnumerable转换为IObservable,并带有变量Period

本文关键字:变量 Period IEnumerable 转换 IObservable | 更新日期: 2023-09-27 17:54:32

我正在使用Rx消耗3轴加速度计数据。我需要设置一些单元测试。数据帧的速度很快,帧之间的中间时间间隔为80ms,但有时也会达到120ms。此外,它从来不会精确到80ms,而是在这个范围内徘徊。

所以我已经创建了一个订阅IObservable的类,并将数据帧顺序记录在.csv文件中。csv文件中的一个字段是帧的开始时间,其中第一帧的开始时间为0.0。

现在我想读取该文件并再次流式传输,以用于测试和开发。我想使用StartTime字段作为在测试时何时触发任何给定加速度计帧的调度。

我浏览了这个问题的答案,用。net响应式扩展定期调度IEnumerable但它似乎只适用于固定的时间范围。

问题:在Rx框架中是否已经有一种规范和首选的方式来安排不规则(但已知)间隔的帧推送,或者我应该以某种方式滚动我自己的?

编辑2:我感兴趣的东西可能很简单,就像:

IObservable<T> AsObservable(
   IEnumerable<T> source, Func<T, TimeSpan> getTimeDelta)
{
   var retVal = ColdObservableVaryingTime();
   foreach(var frame in source)
   {
       retVal.AddScheduled(getTimeDelta, frame);
   }
   return retVal;
}

编辑1:在这个问题中我称之为"帧",Rx文档调用TState。

将IEnumerable转换为IObservable,并带有变量Period

这里有另一个选择。Observable.Generate方法非常强大,它可以用来生成相当复杂的值序列。

你可以这样做。

那么,从这样的CSV文件开始:

var csvLines = new []
{
    "A,0",
    "B,3",
    "C,4",
    "D,6",
};

(可以读作var csvLines = File.ReadAllLines(@"...");)

然后可以解析以下行:

var parsed =
(
    from line in csvLines
    let parts = line.Split(',')
    select new
    {
        item = parts[0],
        seconds = int.Parse(parts[1])
    }
).ToArray();

然后确定CSV中每行之间的秒offset:

var data =
    parsed
        .Zip(parsed.Skip(1), (p0, p1) => new
        {
            p1.item,
            offset = p1.seconds - p0.seconds,
        })
        .ToArray();
现在你可以创建可观察对象了:
var observable =
    Observable
        .Generate(
            0,
            n => n < data.Length,
            n => n + 1,
            n => data[n].item,
            n => TimeSpan.FromSeconds(data[n].offset))
        .StartWith(parsed[0].item);

当我运行这段代码时,我从源文件中得到了正确的计时。


我已经更新了我的代码给出你的类定义在下面的注释:

IEnumerable<AccelerometerFrame_raw> frames = ...;
var data =
    frames
        .Zip(frames.Skip(1), (f0, f1) => new
        {
            f1,
            offset = f1.TimeStampSeconds - f0.TimeStampSeconds,
        })
        .ToArray();
IObservable<AccelerometerFrame_raw> observable =
    Observable
        .Generate(
            0,
            n => n < data.Length,
            n => n + 1,
            n => data[n].f1,
            n => TimeSpan.FromSeconds(data[n].offset))
        .StartWith(frames.First());

为此,您可以使用TestScheduler(获取Rx-Testing Nuget包)。使用这个类作为测试中的isscheduler实现将允许您调度序列,因为测试调度程序允许您通过声明每个项目应该何时推送来创建可观察序列。然后你可以"播放"调度程序或提前到某个时间,看看发生了什么。请注意,在此设置中,时间是虚拟化的,为了简单起见,TestScheduler将tick作为其时间进度单位。

RX简介网站有一个很好的部分关于RX测试在这里:http://www.introtorx.com/content/v1.0.10621.0/16_TestingRx.html

James Lucas提供的答案是正确方向的一个很好的指针,但成功的答案是更复杂的。

在用csv文件中的值填充IEnumerable之后,我必须填充

数组
Recorded<Notification<AccelerometerFrame_raw>>

然后将数组作为参数传递给调度器CreateColdAbservable。完成这些之后,冷观测就在那里等待启动了。在我的特殊情况下,代码看起来像这样:

private TestScheduler sched {get; set;}
public IObservable<AccelerometerFrame_raw> DataStream { get; protected set; }
ctor()
{
   DataStream = SetupDeviceStream();
}
  private IObservable<AccelerometerFrame_raw> SetupDeviceStream()
  {
     var framesArray = 
       new Recorded<Notification<AccelerometerFrame_raw>>[allFrames.Count+1];
     int i = 0;
     long timeStamp = 0;
     foreach(var item in allFrames)
     {
        timeStamp += (long) (item.TimeStampSeconds * 1000.0);
        framesArray[i] = new Recorded<Notification<AccelerometerFrame_raw>>(
           timeStamp, 
           Notification.CreateOnNext(item));
        i++;
     }
     framesArray[i] = new Recorded<Notification<AccelerometerFrame_raw>>(
           timeStamp + 10, 
           Notification.CreateOnCompleted<AccelerometerFrame_raw>());
     sched = new TestScheduler();
     var stream =sched.CreateColdObservable(framesArray);
     return stream;
  }

////当我准备好启动cold observable时,我调用

sched.Start();

把这一切放在一起的帮助来自

(Phil Haak) http://haacked.com/archive/2014/03/10/master-time-with-reactive-extensions/

和https://msdn.microsoft.com/en-us/library/hh229343 (v = vs.103) . aspx

还需要安装另一个Nuget包:

http://www.nuget.org/packages/reactiveui-testing/

正如Phil Haak在链接的博客文章中所说,"不幸的是,[TestScheduler]按原样使用有点痛苦,这就是为什么Paul Betts自己编写了一些有用的TestScheduler扩展方法"