为什么订阅 RX.NET by Latest 中的可观察量只接受 1 个订阅者
本文关键字:观察 NET RX by Latest 为什么 | 更新日期: 2023-09-27 18:33:52
我的目标是从一个可观察对象中有两个订阅者,但我只对事件流中的最新项目感兴趣。我希望其他人被丢弃。将此视为每 1 秒更新一次且忽略任何中间值的股票价格屏幕。这是我的代码:
var ob = Observable.Interval(TimeSpan.FromMilliseconds(100)) // fast event source
.Latest().ToObservable().ToEvent();
ob.OnNext += (l =>
{
Console.WriteLine(Thread.CurrentThread.ManagedThreadId);
Thread.Sleep(1000); // slow processing of events
Console.WriteLine("Latest: " + l);
});
ob.OnNext += (l =>
{
Console.WriteLine(Thread.CurrentThread.ManagedThreadId);
Thread.Sleep(1000); // slow processing of events
Console.WriteLine("Latest1: " + l);
// subject.OnNext(l);
});
但是,由于上面的代码,尽管我附加了两个事件(即使您使用订阅表示法也没关系),但仅定期调用第一个订阅。第二个根本跑不了。为什么会这样?
首先,我认为您的要求是以下之一:
- 你只想获得未来的值
- 或者您想要获取最新值(如果有)和任何未来值
- 或者您只需要最新的值(如果有)
- 或者您只想对即时报价进行采样并获取每秒的值
- 或者您的使用者速度较慢,需要执行负载卸除(如在 GUI 中)
1)
的代码var ob = Observable.Interval(TimeSpan.FromMilliseconds(1000)) // fast event source
.Publish();
ob.Connect();
2)的代码
var ob = Observable.Interval(TimeSpan.FromMilliseconds(1000)) // fast event source
.Replay(1);
ob.Connect();
3
) 的代码)var ob = Observable.Interval(TimeSpan.FromMilliseconds(1000)) // fast event source
.Replay(1);
ob.Connect();
var latest = ob.Take(1);
4)的代码可以是这样的,但是在你认为的窗口周围有一些微妙的行为。
var ob = Observable.Interval(TimeSpan.FromMilliseconds(200)) // fast event source
.Replay(1);
//Connect the hot observable
ob.Connect();
var bufferedSource = ob.Buffer(TimeSpan.FromSeconds(1))
.Where(buffer => buffer.Any())
.Select(buffer => buffer.Last());
5)的代码可以在James World的博客 http://www.zerobugbuild.com/?p=192 上找到,并且在伦敦的许多银行应用程序中很常见。
我想你不明白.Latest()
做什么。
public static IEnumerable<TSource> Latest<TSource>(
this IObservable<TSource> source
)
在每次迭代时返回最后一个采样元素并随后阻塞的可枚举序列,直到可观察源序列中的下一个元素可用。
请注意,它会阻止等待可观察量的下一个元素。
因此,当您使用.Latest()
将IObservable<>
变成IEnumerable<>
时,您必须使用.ToObservable()
将其变回IObservable<>
才能呼叫.ToEvent()
。这就是它倒下的地方。
此代码的问题在于您正在创建阻止的代码。
如果你只是这样做,它会起作用:
var ob = Observable.Interval(TimeSpan.FromMilliseconds(100)).ToEvent();
无需调用.Latest()
,因为您始终从可观察量中获取最新值。您永远无法获得更早的值。这是一个可观察的,而不是时间机器。
我也不明白的是,你为什么无论如何都要打电话给.ToEvent()
。 有什么需要?
只需这样做:
var ob = Observable.Interval(TimeSpan.FromMilliseconds(100));
ob.Subscribe(l =>
{
Console.WriteLine(Thread.CurrentThread.ManagedThreadId);
Thread.Sleep(1000); // slow processing of events
Console.WriteLine("Latest: " + l);
});
ob.Subscribe(l =>
{
Console.WriteLine(Thread.CurrentThread.ManagedThreadId);
Thread.Sleep(1000); // slow processing of events
Console.WriteLine("Latest1: " + l);
});