Rx将主题的使用转换为Observable.Create方法

本文关键字:转换 Observable Create 方法 Rx | 更新日期: 2023-09-27 18:26:55

我正在尝试使用反应式扩展(Rx)来创建一个可由多个用户订阅的热可观察对象,这些用户都可以获得推送给他们的值。我可以使用以下主题来做到这一点:

var subj = new Subject<int>();
var observable = subj.AsObservable();
observable.Subscribe(x => Console.WriteLine("1 Number: {0}", x));
observable.Subscribe(x => Console.WriteLine("2 Number: {0}", x));
subj.OnNext(1);
subj.OnNext(2);
subj.OnNext(3);
  //and so on

但我读到主题是用于"实验"的,我想用Observable.Create factory方法做同样的事情。我环顾四周,有很多使用Create方法创建冷可观察器的例子,但我希望有与上面代码产生的行为相同的行为。

谢谢你的帮助。

Nick

Rx将主题的使用转换为Observable.Create方法

您的问题可以分为两个单独的问题。

1.如何在没有主题的情况下创建Observable

在这本完美的书中列出了许多方法。Observable.Create只是其中之一,但为了获得像您的示例(1,2,3)中那样的值序列,我只需使用

var source = Observable.Range(1, 3);

但是,正如您可能注意到的,以这种方式创建的流将是冷的可观测流。这引出了第二个问题:

2.如何将冷观察转化为热观察,并在订阅者之间共享结果

为此,您需要一个Publish函数。它允许在订阅者之间共享Rx流。试试这个:

var sourceHot = Observable.Range(1, 3).Publish();
sourceHot.Subscribe(x => Console.WriteLine("1 Number: {0}", x));
sourceHot.Subscribe(x => Console.WriteLine("2 Number: {0}", x));
var disp = sourceHot.Connect();

请注意,如果您不想手动调用Connect/Disconnect,可以使用RefCount函数。还要注意,stream.Publish()与调用stream.Multicast(new Subject<T>())完全相同。

我强烈推荐阅读《RX中的共享:发布、回放和多播》一文,这篇文章深入解释了这个主题。