如何通过observable向可观察流中添加元素.基于流值的定时器

本文关键字:元素 添加 于流值 定时器 observable 何通过 观察 | 更新日期: 2023-09-27 18:06:48

我希望标题足够清楚。我在玩Rx的想法,遇到了一个问题,我不知道如何解决。

我有一个原始形式的事件输入,比如{ 1, 2, 3, 4, 5, 1, 2, 3, 4 }被转换成某种新形式的输入,比如IType。假设每当观察到数字2时,计时器开始滴答并插入一个新的转换值(或者,说,数字6)到流,除非遇到数字5,在这种情况下,计时器被取消。

我正在考虑构建一个从各种来源获取输入的状态机。如果我也能将"定时"事件作为流的一部分就好了。

代码如下,但基本上我目前坚持使用Observable的一些想法。生成(即使其看起来像Observable.Timer,但返回/插入一个新的事件到输入序列)和/或可能像Observable.TakeUntil, Observable.Create和合并流,但我不确定是否有更好的方法,甚至如何在代码方面实现这一点。

在代码方面可以是这样的

using System;
using System.Collections.Generic;
using System.Reactive.Linq;
using System.Reactive.Subjects;

namespace RxTesting
{
    public interface IType
    {
        int OriginalInput { get; set; }
    }

    public class SomeEvent: IType
    {
        public int OriginalInput { get; set; }
    }

    public class TimeEvent: IType
    {
        public int OriginalInput { get; set; }
    }
     class  Program
     {
         static  void  Main(string[] args)
         {
             //Here we should have one TimerEvent after the last "2" in the sequence.
             var input = new Subject<int>();
             input.Select(i => new SomeEvent { OriginalInput = i }).SomeTimerOperatorHere().Subscribe(Console.WriteLine);
             input.OnNext(1);
             input.OnNext(2);
             input.OnNext(3);
             input.OnNext(4);
             input.OnNext(5);
             input.OnNext(1);
             input.OnNext(2);
             input.OnNext(3);
             input.OnNext(4);
             input.Dispose();
             Console.ReadKey();
        }
    }
}

在控制台中应该打印一个RxTesting.TimerEvent,因为有一个2,之后没有数字5取消其效果。

如何通过observable向可观察流中添加元素.基于流值的定时器

这应该可以了-我认为你的TakeUntil想法是正确的。

我们正在过滤事件,只包括那些OriginalInput为2的事件,并将每个事件投射到一个值为6的单值延迟流,如果OriginalInput为5的事件到达,该流将被切断。生成的流的流被SelectMany平铺。

public static IObservable<IType> SomeTimerOperatorHere(
  this IObservable<SomeEvent> source)
{
  var delay = TimeSpan.FromSeconds(1);
  return Observable.Create<IType>(o => {
    var sourcePub = source.Publish().RefCount();
    return sourcePub
      .Where(x => x.OriginalInput == 2)
      .SelectMany(type => 
        Observable.Return(new TimeEvent{ OriginalInput = 6 })
                  .Delay(delay)
                  .TakeUntil(sourcePub.Where(x => x.OriginalInput == 5))
      ).Subscribe(o);
  });
}