当有Start和Stop方法时,用响应式扩展包装事件

本文关键字:响应 扩展 包装 事件 Start Stop 方法 当有 | 更新日期: 2023-09-27 18:11:47

我试图用IObservable包装设备类。如果没有Rx,它的用法如下:

device.IncomingData += data => { /* do something with float[] data */ };
device.Start(500);
// Something...
device.Stop(); 
到目前为止,我已经有了一个包装器类,如下所示,它跟踪有多少观察者正在使用流,并相应地停止和启动它。

有没有一个内置的方法来跟踪观察者与Rx?

private class ObservableWrapper
{
  private int _observers;
  public ObservableStreamer(IDevice device)
  {
    Stream = Observable.FromEvent<float[]>(
      e =>
        {
          device.IncomingData += e;
          int obs = Interlocked.Increment(ref _observers);
          if (obs < 2)
            device.Start();
        },
      e =>
        {
          device.IncomingData -= e;
          int obs = Interlocked.Decrement(ref _observers);
          if (obs < 1)
            device.Stop();
        });
  }
  public IObservable<float[]> Stream { get; private set; }
} 
var wrap = new ObservableWrapper(device);
wrap.Stream.Subscribe(data => { /* do something with float[] data */ });

当有Start和Stop方法时,用响应式扩展包装事件

在构建自定义观察对象时,不要添加refcounting或connection sharing。如果您需要这些功能,您可以将它们分别添加到RefCountPublish中。您也应该几乎没有理由自己实现I(Connectable)Observable

对于您的特定用例,它可以是一个相当简单的扩展方法:

public static DeviceExtensions
{
    public static IObservable<float[]> AsObservable(this Device device)
    {
        return Observable.CreateWithDisposable<float[]>(obs =>
        {
            IDisposable disposable = Observable.FromEvent<float[]>(
                e => device.IncomingData += e,
                e => device.IncomingData -= e
                )
                .Finally(device.Stop)
                .Subscribe(obs);
            device.Start();
            return disposable;
        });
    }
}

现在你可以这样使用:

IObservable<float[]> observableData = device.AsObservable()
    .RefCount(); // If you need ref counting
observableData.Subscribe(data => {});
observableData.Subscribe(data => {});

听起来你拥有的实际上是更好地映射到较少使用的IConnectableObservable。Connect方法将调用Start并返回一个调用Stop的一次性函数。Subscribe方法会转发给Observable。FromEvent(没有所有的引用计数)。然后你可以在上面使用RefCount把它变成一个普通的IObservable。与您当前的实现一样,您必须为所有订阅使用相同的实例,否则计数将无法正常工作。

例如(未编译的代码传入):

class ObservableDevice : IConnectableObservable
{
    public ObservableDevice(IDevice device)
    {
        _device = device;
        //not strictly necessary to cache this, but this way you only
        //create it once
        _stream = Observable.FromEvent<...>(...);
    }
    private IDevice _device;
    private IObservable _stream;
    public IDisposable Connect()
    {
        //it's up to you if you want/need to guard against multiple starts
        _device.Start();
        return Disposable.Create(() => { _device.Stop(); });
    }
    public IDisposable Subscribe(IObserver observer)
    {
        //error checking if you want, or just defer to 
        //_stream.Subscribe's error checking
        return _stream.Subscribe(observer);
    }
}