当有Start和Stop方法时,用响应式扩展包装事件
本文关键字:响应 扩展 包装 事件 Start Stop 方法 当有 | 更新日期: 2023-09-27 18:11:47
我试图用IObservable
包装设备类。如果没有Rx,它的用法如下:
device.IncomingData += data => { /* do something with float[] data */ };
device.Start(500);
// Something...
device.Stop();
到目前为止,我已经有了一个包装器类,如下所示,它跟踪有多少观察者正在使用流,并相应地停止和启动它。有没有一个内置的方法来跟踪观察者与Rx?
private class ObservableWrapper
{
private int _observers;
public ObservableStreamer(IDevice device)
{
Stream = Observable.FromEvent<float[]>(
e =>
{
device.IncomingData += e;
int obs = Interlocked.Increment(ref _observers);
if (obs < 2)
device.Start();
},
e =>
{
device.IncomingData -= e;
int obs = Interlocked.Decrement(ref _observers);
if (obs < 1)
device.Stop();
});
}
public IObservable<float[]> Stream { get; private set; }
}
var wrap = new ObservableWrapper(device);
wrap.Stream.Subscribe(data => { /* do something with float[] data */ });
在构建自定义观察对象时,不要添加refcounting或connection sharing。如果您需要这些功能,您可以将它们分别添加到RefCount
和Publish
中。您也应该几乎没有理由自己实现I(Connectable)Observable
。
对于您的特定用例,它可以是一个相当简单的扩展方法:
public static DeviceExtensions
{
public static IObservable<float[]> AsObservable(this Device device)
{
return Observable.CreateWithDisposable<float[]>(obs =>
{
IDisposable disposable = Observable.FromEvent<float[]>(
e => device.IncomingData += e,
e => device.IncomingData -= e
)
.Finally(device.Stop)
.Subscribe(obs);
device.Start();
return disposable;
});
}
}
现在你可以这样使用:
IObservable<float[]> observableData = device.AsObservable()
.RefCount(); // If you need ref counting
observableData.Subscribe(data => {});
observableData.Subscribe(data => {});
听起来你拥有的实际上是更好地映射到较少使用的IConnectableObservable。Connect方法将调用Start并返回一个调用Stop的一次性函数。Subscribe方法会转发给Observable。FromEvent(没有所有的引用计数)。然后你可以在上面使用RefCount把它变成一个普通的IObservable。与您当前的实现一样,您必须为所有订阅使用相同的实例,否则计数将无法正常工作。
例如(未编译的代码传入):
class ObservableDevice : IConnectableObservable
{
public ObservableDevice(IDevice device)
{
_device = device;
//not strictly necessary to cache this, but this way you only
//create it once
_stream = Observable.FromEvent<...>(...);
}
private IDevice _device;
private IObservable _stream;
public IDisposable Connect()
{
//it's up to you if you want/need to guard against multiple starts
_device.Start();
return Disposable.Create(() => { _device.Stop(); });
}
public IDisposable Subscribe(IObserver observer)
{
//error checking if you want, or just defer to
//_stream.Subscribe's error checking
return _stream.Subscribe(observer);
}
}