如何使用 async/await OnNext/OnError/OnDone 方法实现 IObserver

本文关键字:方法 OnDone 实现 IObserver OnError OnNext 何使用 async await | 更新日期: 2023-09-27 18:33:01

我正在尝试为System.Net.WebSocket编写一个扩展方法,该方法将使用反应式扩展(Rx.NET(将其转换为IObserver。您可以看到下面的代码:

public static IObserver<T> ToObserver<T>(this WebSocket webSocket, IWebSocketMessageSerializer<T> webSocketMessageSerializer)
{
    // Wrap the web socket in an interface that's a little easier to manage
    var webSocketMessageStream = new WebSocketMessageStream(webSocket);
    // Create the output stream to the client
    return Observer.Create<T>(
        onNext:      async message => await webSocketMessageStream.WriteMessageAsync(webSocketMessageSerializer.SerializeMessage(message)),
        onError:     async error   => await webSocketMessageStream.CloseAsync(WebSocketCloseStatus.InternalServerError, string.Format("{0}: {1}", error.GetType(), error.Message)),
        onCompleted: async ()      => await webSocketMessageStream.CloseAsync(WebSocketCloseStatus.NormalClosure, "Server disconnected")
    );
}

这段代码有效,但我担心在onNext,onError和onComplete的lambdas中使用async/await。我知道这会返回一个异步空 lambda,这是不受欢迎的(有时会导致我已经遇到过的问题(。

我一直在阅读互联网上的 Rx.NET 文档和博客文章,但我似乎找不到在 IObserver 中使用 async/await 方法的正确方法(如果有的话(。有没有正确的方法可以做到这一点?如果没有,那么我该如何解决此问题?

如何使用 async/await OnNext/OnError/OnDone 方法实现 IObserver

订阅

不接受async方法。所以这里发生的事情是你正在使用async void的即发即弃机制。问题是 onNext 消息将不再序列化。

不应在 订阅 中调用 async 方法,而应将其包装到管道中以允许 Rx 等待它。

可以

onErroronCompleted使用即发即弃,因为这些保证是Observable最后调用的东西。请记住,与Observable关联的资源可以在onError后清理,并在完成之前onCompleted返回。

我写了一个小的扩展方法来做这一切:

    public static IDisposable SubscribeAsync<T>(this IObservable<T> source, Func<T, Task> onNext, Action<Exception> onError, Action onCompleted)
    {
        return source.Select(e => Observable.Defer(() => onNext(e).ToObservable())).Concat()
            .Subscribe(
            e => { }, // empty
            onError,
            onCompleted);
    }

首先,我们将async onNext方法转换为Observable,连接两个异步世界。这意味着将尊重onNext内部的异步/等待。接下来,我们将方法包装到Defer中,这样它就不会在创建时立即启动。相反,Concat只会在前一个完成时才调用下一个延迟可观察的。

附言。我希望 Rx.Net 的下一个版本将在订阅中获得async支持。