为什么订阅 RX.NET by Latest 中的可观察量只接受 1 个订阅者

本文关键字:观察 NET RX by Latest 为什么 | 更新日期: 2023-09-27 18:33:52

我的目标是从一个可观察对象中有两个订阅者,但我只对事件流中的最新项目感兴趣。我希望其他人被丢弃。将此视为每 1 秒更新一次且忽略任何中间值的股票价格屏幕。这是我的代码:

    var ob = Observable.Interval(TimeSpan.FromMilliseconds(100)) // fast event source
        .Latest().ToObservable().ToEvent();
    ob.OnNext += (l =>
                      {
                          Console.WriteLine(Thread.CurrentThread.ManagedThreadId);
                          Thread.Sleep(1000); // slow processing of events
                          Console.WriteLine("Latest: " + l);
                                                    });
    ob.OnNext += (l =>
        {
            Console.WriteLine(Thread.CurrentThread.ManagedThreadId);
            Thread.Sleep(1000); // slow processing of events
            Console.WriteLine("Latest1: " + l);
            //  subject.OnNext(l);
        });

但是,由于上面的代码,尽管我附加了两个事件(即使您使用订阅表示法也没关系),但仅定期调用第一个订阅。第二个根本跑不了。为什么会这样?

为什么订阅 RX.NET by Latest 中的可观察量只接受 1 个订阅者

首先,我认为您的要求是以下之一:

  1. 你只想获得未来的值
  2. 或者您想要获取最新值(如果有)和任何未来值
  3. 或者您只需要最新的值(如果有)
  4. 或者您只想对即时报价进行采样并获取每秒的值
  5. 或者您的使用者速度较慢,需要执行负载卸除(如在 GUI 中)

1)

的代码
var ob = Observable.Interval(TimeSpan.FromMilliseconds(1000)) // fast event source
    .Publish();
ob.Connect();

2)的代码

var ob = Observable.Interval(TimeSpan.FromMilliseconds(1000)) // fast event source
    .Replay(1);
ob.Connect();    

3

) 的代码)
var ob = Observable.Interval(TimeSpan.FromMilliseconds(1000)) // fast event source
    .Replay(1);
ob.Connect();  
var latest = ob.Take(1); 

4)的代码可以是这样的,但是在你认为的窗口周围有一些微妙的行为。

var ob = Observable.Interval(TimeSpan.FromMilliseconds(200)) // fast event source
    .Replay(1);
//Connect the hot observable
ob.Connect();
var bufferedSource = ob.Buffer(TimeSpan.FromSeconds(1))
    .Where(buffer => buffer.Any())
    .Select(buffer => buffer.Last());  

5)的代码可以在James World的博客 http://www.zerobugbuild.com/?p=192 上找到,并且在伦敦的许多银行应用程序中很常见。

我想你不明白.Latest()做什么。

public static IEnumerable<TSource> Latest<TSource>(
    this IObservable<TSource> source
)

在每次迭代时返回最后一个采样元素并随后阻塞的可枚举序列,直到可观察源序列中的下一个元素可用。

请注意,它会阻止等待可观察量的下一个元素。

因此,当您使用.Latest()IObservable<>变成IEnumerable<>时,您必须使用.ToObservable()将其变回IObservable<>才能呼叫.ToEvent()。这就是它倒下的地方。

此代码的问题在于您正在创建阻止的代码。

如果你只是这样做,它会起作用:

var ob = Observable.Interval(TimeSpan.FromMilliseconds(100)).ToEvent();

无需调用.Latest(),因为您始终从可观察量中获取最新值。您永远无法获得更早的值。这是一个可观察的,而不是时间机器。

我也不明白的是,你为什么无论如何都要打电话给.ToEvent()。 有什么需要?

只需这样做:

var ob = Observable.Interval(TimeSpan.FromMilliseconds(100));
ob.Subscribe(l =>
{
    Console.WriteLine(Thread.CurrentThread.ManagedThreadId);
    Thread.Sleep(1000); // slow processing of events
    Console.WriteLine("Latest: " + l);
});
ob.Subscribe(l =>
{
    Console.WriteLine(Thread.CurrentThread.ManagedThreadId);
    Thread.Sleep(1000); // slow processing of events
    Console.WriteLine("Latest1: " + l);
});