WinRT 上的 Rx:随着时间的推移迭代收集

本文关键字:迭代 时间 Rx 上的 WinRT | 更新日期: 2023-09-27 18:32:51

我想通过在特定时间间隔内从一个元素移动到另一个元素来迭代集合。因此,例如此方法可以正常工作:

        var a = new List<int> { 1, 2, 3, 4}.ToObservable();
        var b = Observable.Interval(TimeSpan.FromSeconds(1));
        var c = a.Zip(b, (l, r) => l);
        c.Subscribe(x => Debug.WriteLine(x));

但是我想使用列表中每个元素的值作为间隔,所以我使用以下代码:

        var a = new List<int> { 1, 2, 3, 4}.ToObservable();
        var b = a.Delay(x => Observable.Timer(TimeSpan.FromSeconds(x)));
        b.Subscribe(x => Debug.WriteLine(x));

正如它在这里所说的 http://blogs.msdn.com/b/rxteam/archive/2012/03/12/reactive-extensions-v2-0-beta-available-now.aspx"延迟的新重载允许人们指定(可选)订阅的延迟以及基于选择器函数的每个元素的延迟"。但是运行代码并不像预期的那样工作。它只是使用 1 秒间隔吐出列表中的元素。这样:

。(1秒)1...(1秒)2...(1秒)3...(1秒)四

而不是

。(1秒)1...(2秒)2...(3秒)3...(4秒)四

我错过了什么吗?

WinRT 上的 Rx:随着时间的推移迭代收集

这个怎么样:

new[] {1,2,3,4,5,}.ToObservable()
    .Select(x => Observable.Return(x).Delay(TimeSpan.FromSeconds(x)))
    .Concat();

右?您想将数字用作值以及延迟的时间量吗?

这是一个解决方案,它只在可枚举源上推进迭代器一步,然后再次等待(适用于无限可枚举的源序列):

static IObservable<T> ToObservableDelay<T>(
    IEnumerable<T> source, 
    Func<T, TimeSpan> delaySelector, 
    IScheduler scheduler
)
{
    return Observable.Create<T>(o =>
        scheduler.ScheduleAsync(async (s, c) =>
        {
            try
            {
                foreach (var x in source)
                {
                    await Task.Delay(delaySelector(x), c);
                    o.OnNext(x);
                }
                o.OnCompleted();
            }
            catch (TaskCanceledException) { /* ignore */ }
            catch (Exception e)
            {
                o.OnError(e);
            }
         })
    );
}

这里有一个带有无限生成器的小演示:

static IEnumerable<double> Uniform(Random rng)
{
    while (true)
        yield return rng.NextDouble();
}
static void Main(string[] args)
{
    var source = Uniform(new Random());
    Console.WriteLine("press any key to quit");
    using (var subscription = 
        ToObservableDelay(source, TimeSpan.FromSeconds, Scheduler.Default)
        .Subscribe(Console.WriteLine))
    {
        Console.ReadKey();
    }
}

在 RT 应用程序中运行的简单演示代码:

 var source = Uniform(new Random());
 ToObservableDelay(source, TimeSpan.FromSeconds, Scheduler.Default)
     .Take(10)
     .Subscribe(x => Debug.WriteLine(x));

我在尝试解决同一问题时发现了这个问题。我的解决方案是将 Observable.Generator 与 IEnumerable 的枚举器一起使用,如下所示:

public static IObservable<T> ToObservable<T>(this IEnumerable<T> source, TimeSpan time)
{
    var enumerator = source.GetEnumerator();
    var observable = Observable.Generate(
        enumerator.Current,
        _ => enumerator.MoveNext(),
        _ => enumerator.Current,
        _ => enumerator.Current,
        _ => time);
    return observable;
}

然后简单地调用此扩展方法,如下所示:

var observable = Enumerable.Range(1, 10)
    .ToObservable(TimeSpan.FromSeconds(1));
using (observable.Subscribe(Console.WriteLine))
{
    Console.WriteLine("Press the Any key to abort");
    Console.ReadKey();
}