使用rx订阅事件并在时间间隔后执行日志记录

本文关键字:执行 记录 日志 时间 rx 事件 使用 | 更新日期: 2023-09-27 18:01:00

我有一个简单的用例,其中:

  1. 接收事件通知
  2. 对事件执行一些操作
  3. 在x间隔后打印内容

如何在单个Rx管道中执行上述步骤?

如下所示:

void Main()
{
    var observable = Observable.Interval(TimeSpan.FromSeconds(1));
    // Receive event and call Foo()
    observable.Subscribe(x=>Foo());
    // After 1 minute, I want to print the result of count
    // How do I do this using above observable?
}
int count = 0;
void Foo()
{
    Console.Write(".");
    count ++;
}

使用rx订阅事件并在时间间隔后执行日志记录

我认为这符合您的要求:

var observable =
    Observable
        .Interval(TimeSpan.FromSeconds(1))
        .Do(x => Foo())
        .Window(() => Observable.Timer(TimeSpan.FromMinutes(1.0)));
var subscription =
    observable
        .Subscribe(xs => Console.WriteLine(count));

然而,将状态和可观察性混合在一起是个坏主意。如果您有两个订阅,则count的增量会快一倍。最好将您的状态封装在可观察对象中,这样每个订阅都会获得一个新的count实例。

试试这个:

var observable =
    Observable
        .Defer(() =>
        {
            var count = 0;
            return
                Observable
                    .Interval(TimeSpan.FromSeconds(1))
                    .Select(x =>
                    {
                        Console.Write(".");
                        return ++count;
                    });
        })
        .Window(() => Observable.Timer(TimeSpan.FromMinutes(0.1)))
        .SelectMany(xs => xs.LastAsync());
var subscription =
    observable
        .Subscribe(x => Console.WriteLine(x));

我得到这样的输出:

...........................................................59
............................................................119
............................................................179
............................................................239

记住它从0开始,那么这是一个很好的时机。


在看到paulpdaniels的答案后,我意识到我可以用更简单的Sample运算符代替我的Window/SelectMany/LastAsync

此外,如果我们真的不需要增加计数器的副作用,那么整个可观察到的结果就会缩小到:

var observable =
    Observable
        .Interval(TimeSpan.FromSeconds(1.0))
        .Do(x => Console.Write("."))
        .Sample(TimeSpan.FromMinutes(1.0));
observable.Subscribe(x => Console.WriteLine(x));

简单多了!

我会使用Select+Sample:

var observable = Observable.Interval(TimeSpan.FromSeconds(1))
          .Select((x, i) => {
            Foo(x);
            return i;
          })
          .Do(_ => Console.Write("."))
          .Sample(TimeSpan.FromMinutes(1));
observable.Subscribe(x => Console.WriteLine(x));

Select有一个重载,它返回当前值的索引,通过返回该索引,然后以1分钟的间隔采样,您可以获得该间隔期间发出的最后一个值。