可观察 Rx 中的回调
本文关键字:回调 Rx 观察 | 更新日期: 2023-09-27 18:12:30
我正在寻找一种优雅的方式来使用 Rx 从普通回调委托创建Observable
,类似于 Observable.FromEventPattern
?
比如说,我正在包装 Win32 EnumWindows
API,它会回调我提供的EnumWindowsProc
。
我知道我可以为此回调创建一个临时 C# 事件适配器并将其传递给FromEventPattern
。此外,我可能可以手动实现IObservable
,因此它会从我的EnumWindowsProc
回调中调用IObserver.OnNext
。
是否存在我缺少的用于在 Rx 中包装回调的现有模式?
您可以使用可用于从命令式编程世界移动到Rx功能世界的Subject<T>
。
Subject<T>
同时实现了IObservable<T>
和IObserver<T>
,因此您可以调用其OnNext
、OnError
和OnCompleted
方法,订阅者将收到通知。
如果要将Subject<T>
公开为属性,则应使用.AsObservable()
,因为这隐藏了IObservable<T>
实际上是Subject<T>
的事实。它使诸如((Subject<string>) obj.Event).OnNext("Foo")
之类的事情变得不可能。
,像EnumWindows
中使用的回调与Rx有细微的不同。 具体来说,回调可以通过其返回值与调用方通信。 Rx 观察者无法执行此操作。 此外,回调可以接收多个参数,但 Rx 观察器接收单个值。 因此,您需要将多个参数包装到单个对象中。
考虑到这一点,使用Subject
的替代方法是使用 Observable.Create
. 这样,您仅在实际存在观察者时才注册回调,如果该观察者取消订阅,则取消注册。
对于您使用示例的同步 API,您可以执行以下操作。 请注意,在此示例中,实际上没有一种方法可以在中途注销回调,因为在我们可以返回取消订阅一次性之前,这一切都是同步发生的。
public static IObservable<Foo> WrapFooApi(string arg1, string arg2)
{
return Observable.Create<Foo>(observer =>
{
FooApi.enumerate(arg1, arg2, e =>
{
observer.OnNext(new Foo(e));
return true;
});
// In your case, FooApi.enumerate is actually synchronous
// so when we get to this line of code, we know
// the stream is complete.
observer.OnCompleted();
return Disposable.Empty;
});
}
// Usage
WrapFooApi("a", "b").Take(1).Subscribe(...); // only takes first item
我们可以通过引入一点异步性来解决无法提前停止的问题,这将使观察者有时间获得可以处理的一次性产品以通知您。 我们可以使用CreateAsync
来获取一个CancellationToken
,该将在观察者取消订阅时取消。 我们可以在 Task.Run
中运行 FooApi
代码:
public static IObservable<Foo> WrapFooApi(string arg1, string arg2)
{
return Observable.CreateAsync<Foo>(async (observer, ct) =>
{
await Task.Run(() => FooApi.register_callback(arg1, arg2, e =>
{
observer.OnNext(e);
// Returning false will stop the enumeration
return !ct.IsCancellationRequested;
}));
observer.OnCompleted();
});
}
在更传统的异步回调 API 中,您在某个时间点注册并在其他时间点取消注册,您可能会有更像这样的东西:
public static IObservable<Foo> WrapFooApi(string args)
{
return Observable.Create<Foo>(observer =>
{
FooToken token = default(FooToken);
var unsubscribe = Disposable.Create(() => FooApi.Unregister(token));
token = FooApi.Register(args, e =>
{
observer.OnNext(new Foo(e));
});
return unsubscribe;
});
}