获得IOobservable<;T>;来自Action<;T>

本文关键字:gt lt Action 来自 获得 IOobservable | 更新日期: 2023-09-27 18:02:04

我有一个方法void OnAction(Action<Person> callback),我想使用反应扩展(Rx(从中创建一个IObservable<T>

我找到了两种可以帮助我的方法:Observable.FromEvent()Observable.Start():

var observable = Observable.Start(() =>
                                              {
                                                  Person person = null;
                                                  _mngr.OnAction(p => person = p);
                                                  return person;
                                              });

和:

  var observable = Observable.FromEvent<Person>(
                action => _mngr.OnAction(action), //Add Handler
                action => // Remove Handler
                {
                });

第一个有一个闭包,我必须评估if person != null:

var foo= observable.Where(p =>
                          {
                if(p!=null) //...
                          });

第二个采用Action参数,该参数将给定的事件处理程序与底层.NET事件分离。。。但是OnAction方法不是.NET事件。

这两种方法都很好,但(在我看来(闻起来。。。

那么,从OnAction方法创建IOobservable的最佳方法是什么?

获得IOobservable<;T>;来自Action<;T>

详细阐述Chris的回答并发表您的评论。从这里开始:

var personAsObservable = Observable.Create<Person>(observer => {
    _mngr.OnAction(person => {
        observer.OnNext(person);
        observer.OnCompleted();
    });
    return Disposable.Empty;
});

目前,这将导致为每个订户调用OnAction

避免这种情况的一般方法是发布可观察到的。发布流会导致订阅者共享事件。

Publish运算符返回一个可连接的可观察。这可以接受订阅者,但在您调用Connect()(一个返回IDisposable的方法,您可以使用它来控制与底层可观察对象的单个连接(之前,不会实际订阅底层流。

有几个与发布相关的运算符可以帮助您管理对基础流的订阅。

RefCount使用可连接的可观察对象来管理连接,并在底层订阅运行时与订阅共享事件。一旦完成,后续订阅将重新启动。这可能足以满足您的目的。要使用它,请订阅以下内容(这是一个非常常见的Rx习惯用法(:

var personPub = personAsObservable.Publish().RefCount();

其他方法包括将Replay(n)附加到可观察的源,其中n个事件将被缓存并重播给在底层流完成后到达的子序列订阅者。因此,如果你只想得到一次结果,这很有用。请注意,必须显式调用Replay上的Connect。您也可以直接拨打Publish并管理自己的连接。

请注意,附加这些操作符不会改变底层可观察对象的行为——所有的发布、缓存等都是在附加的操作符上完成的。因此,在上面的示例中,订阅者应该使用personPub

显式控制连接如下所示:

IConnectableObservable<Person> personPub = personAsObservable.Publish();
var subscriberOne = personPub.Subscribe(...); // personAsObservable not started
var connection = personPub.Connect(); // *now* personAsObservable is subscribed
var subscriberTwo = personPub.Subscribe(...); // shares underlying subscription
                                              // but could miss events
connection.Dispose(); // underlying connection terminated
                      // but may have already OnCompleted anyway
                      // in which case this is a no-op
var personAsObservable
        = Observable.Create<Person>(observer => {
            _mngr.OnAction(person => {
                observer.OnNext(person);
                observer.OnCompleted();
            });
        });

如果您希望确保此方法只调用一次,可以执行以下操作。

var publishedPerson = personaAsObservable.Replay(1);
publishedPerson.Connect();
publishedPerson.Subscribe(Console.WriteLine);
publishedPerson.Subscribe(Console.WriteLine);
publishedPerson.Subscribe(Console.WriteLine);
publishedPerson.Subscribe(Console.WriteLine);