DoAsync with Rx

本文关键字:Rx with DoAsync | 更新日期: 2023-09-27 18:20:09

首先,是否有类似"DoAsync"的实现可用于Rx?考虑到我有一个使用相同线程的特定SynchronizationContextISchedulerhttps://gist.github.com/OmerMor/1554548

现在,请参阅下面的代码:

  public static IObservable<T> DoAsyncWithFallBack<T>(this IObservable<T> source, Func<T, Task> accessor, Action<T, Exception> localizedFallback)
    {
        return new AnonymousObservable<T>(obs =>
        {
            return source.Subscribe(async x =>
            {
                try
                {
                    await accessor(x);
                    obs.OnNext(x);
                }
                catch (Exception ex)
                {
                    localizedFallback(x, ex);
                    obs.OnError(ex);
                }
            }, obs.OnError, obs.OnCompleted);
        });
    } 

以及用法(仅举个例子):

   Observable.Return(1)
             .ObserveOn(scheduler)
             .Select(a => new
                             {
                                 EventData = a,
                                 Task = TaskEx.Run(() => DoSomething(a))
                             })
             .DoAsyncWithFallBack(async a => await a.Task, (a, ex) => FallBackPlan(a.EventData, ex))
             .Subscribe(next => {}, ex => {});

使用上面的代码,如果我遇到异常,堆栈跟踪对我没有任何帮助,我会丢失有关DoSomething方法的信息,那么获取它的正确方法是什么?

如果你稍微搜索一下,你会发现一些关于async/await异常的问题:是否可以使用.NET异步方法获得良好的堆栈跟踪?

DoAsync with Rx

对于.NET 4.0,最好在将异常传递给OnError之前包装它。这将保留异常的堆栈跟踪。

obs.OnError(new ApplicationException("Error in DoAsync", ex));