使用异步调用的Rx框架使用void AsyncMethod(操作<;T>;回调)模式

本文关键字:lt 回调 模式 gt 调用 异步 Rx 框架 AsyncMethod void 操作 | 更新日期: 2023-09-27 17:53:31

我看到了很多关于如何在Rx框架中使用Observable.FromAsyncPattern((来简化异步调用的例子,但我使用的接口没有使用IAsyncResult BeginXXX/EndXXX(IAsyncResult(的标准异步模式,所以这对我不起作用。

我正在使用的库使用回调模式公开异步函数:

void GetAllObjects(Action<List<Object>> callback)

在一个理想的世界里,我想把这个:

var isLoadingUsers = true;
var isLoadingSystems = true;
var isLoadingCustomers = true;
var isLoadingRules = true;
mClient.GetAllUsers(UsersCallback);
mClient.GetAllCustomers(CustomersCallback);
mClient.GetAllRules(RulesCallback);
// set the IsLoadingXXX variables to false in callbacks
// once all are false
mClient.GetAllSystems(SystemsCallback);

变成这样:

var o = Observable.ForkJoin(
                     Observable.Start(GetAllUsers()),
                     Observable.Start(GetAllCustomers()),
                     Observable.Start(GetAllRules())
                    ).Finally(() => GetAllSystems);

如何将该模式转化为返回IOobservable的模式?

使用异步调用的Rx框架使用void AsyncMethod(操作<;T>;回调)模式

Func<IObservable<TRet>> FromListCallbackPattern(Action<Action<List<TRet>>> function)
{
    return () => {
        // We use a ReplaySubject so that if people subscribe *after* the
        // real method finishes, they'll still get all the items
        ret = new ReplaySubject<TRet>();
        function((list) => {
            // We're going to "rebroadcast" the list onto the Subject
            // This isn't the most Rx'iest way to do this, but it is the most
            // comprehensible :)
            foreach(var v in list) {
                ret.OnNext(v);
            }
            ret.OnCompleted();
        });
        return ret;
    };
}

现在,你可以做一些类似的事情:

var getAllUsers = FromListCallbackPattern(mClient.GetAllUsers);
getAllUsers().Subscribe(x => /* ... */);

试试Observable.Create(),也许是这样的:

public IObservable<Object> ObserveAllObjects()
{
    return Observable.Create<Object>(
        observer =>
            () => GetAllObjects(objects => objects.ForEach(o => observer.OnNext(o))));
}

我喜欢Observable。为此创建,但@dahlbyk答案不正确(未完成并在取消订阅处理程序中执行操作(。应该是这样的:

    IObservable<List<T>> FromListCallbackPattern<T>(
        Action<Action<List<T>>> listGetter)
    {
        return Observable
            .Create<List<T>>(observer =>
            {
                var subscribed = true;
                listGetter(list =>
                {
                    if (!subscribed) return;
                    observer.OnNext(list);
                    observer.OnCompleted();
                });
                return () =>
                {
                    subscribed = false;
                };
            });
    }

此外,由于最初的API总共返回了一个完整的列表,我认为没有理由过早地将其转换为可观察的。让得到的可观察结果也返回一个列表,如果调用者需要将其压平,他可以使用。SelectMany