Rx操作符连续运行任务,并在结束时收集异常

本文关键字:结束 异常 连续 操作符 运行 任务 Rx | 更新日期: 2023-09-27 18:19:07

我在Silverlight 4中使用Rx来调用我的数据访问代码(在没有TPL的情况下)。

我想做3个web服务调用系列(不是并行现在),并得到他们的结果。我曾经使用SelectMany:

var seq = from a1 in Observable.Return(5).Delay(TimeSpan.FromSeconds(2))
          from b1 in Observable.Return(6).Delay(TimeSpan.FromSeconds(2))
          from c1 in Observable.Return(7).Delay(TimeSpan.FromSeconds(2))
          select new { a1, b1, c1 };

但是我希望第二次和第三次调用仍然被执行,即使第一次调用有异常。

是否有一个Rx操作符,将组合序列,但只有OnException一旦它的所有序列完成?在功能上等同于以下代码:

using System;
using System.Collections.Generic;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Linq;
namespace ConsoleApplication4
{
    public class Results
    {
        public int A { get; set; }
        public int B { get; set; }
        public string C { get; set; }
    }
    class Program
    {
        static void Main(string[] args)
        {
            new Program().Test();
        }
        public void Test()
        {
            GetResults().SubscribeOn(Scheduler.NewThread).Subscribe(
                results => Console.WriteLine("{0} {1} {2}", results.A, results.B, results.C),
                ex => Console.WriteLine(ex.ToString()),
                () => Console.WriteLine("Completed")
            );
            Console.WriteLine("Not blocking");
            Console.Read();
        }
        public IObservable<Results> GetResults()
        {
            return Observable.Create<Results>(obs =>
                {
                    var a = Observable.Return(5).Delay(TimeSpan.FromSeconds(2));
                    var b = Observable.Throw<int>(new Exception("uh oh")).Delay(TimeSpan.FromSeconds(2));
                    var c = Observable.Return("7").Delay(TimeSpan.FromSeconds(2));
                    var results = new Results();
                    var exceptions = new List<Exception>();
                    try
                    {
                        results.A = a.FirstOrDefault();
                    }
                    catch (Exception ex)
                    {
                        exceptions.Add(ex);
                    }
                    try
                    {
                        results.B = b.FirstOrDefault();
                    }
                    catch (Exception ex)
                    {
                        exceptions.Add(ex);
                    }
                    try
                    {
                        results.C = c.FirstOrDefault();
                    }
                    catch (Exception ex)
                    {
                        exceptions.Add(ex);
                    }
                    obs.OnNext(results);
                    if (exceptions.Count > 0)
                        obs.OnError(new AggregateException(exceptions.ToArray()));
                    else
                        obs.OnCompleted();
                    return Disposable.Empty;
                });
        }
    }
}

Rx操作符连续运行任务,并在结束时收集异常

如何:

var result = Observable.Concat(
    startObservable1().Catch(Observable.Return<TTheType>(null)),
    startObservable2().Catch(Observable.Return<TTheType>(null)),
    startObservable3().Catch(Observable.Return<TTheType>(null)));

是否有一个Rx操作符,将组合序列,但只有OnException一旦它的所有序列完成?

这部分有点难,我使用这个类,尽管我不是很满意:

public class Maybe<T>
{
    public Exception Exception { get; protected set; }
    T _Value;
    public T Value {
        get {
            if (Exception != null) {
                throw Exception;
            }
            return _Value;
        }
        protected set { _Value = value; }
    }
    public static Maybe<T> AsError(Exception ex)
    {
        return new Maybe<T>() {Value = default(T), Exception = ex};
    }
    public static Maybe<T> AsValue(T value)
    {
        return new Maybe<T>() {Value = value};
    }
}

然后你可以这样做:

var result = Observable.Concat(
    startObservable1().Select(x => Maybe.AsValue(x)).Catch<T1, Exception>(x => Maybe.AsError(x)),
    startObservable2().Select(x => Maybe.AsValue(x)).Catch<T1, Exception>(x => Maybe.AsError(x)),
    startObservable3().Select(x => Maybe.AsValue(x)).Catch<T1, Exception>(x => Maybe.AsError(x)));

您可以编写自己的Maybeify()扩展方法来隐藏Select+Catch