Rx操作符连续运行任务,并在结束时收集异常
本文关键字:结束 异常 连续 操作符 运行 任务 Rx | 更新日期: 2023-09-27 18:19:07
我在Silverlight 4中使用Rx来调用我的数据访问代码(在没有TPL的情况下)。
我想做3个web服务调用系列(不是并行现在),并得到他们的结果。我曾经使用SelectMany:
var seq = from a1 in Observable.Return(5).Delay(TimeSpan.FromSeconds(2))
from b1 in Observable.Return(6).Delay(TimeSpan.FromSeconds(2))
from c1 in Observable.Return(7).Delay(TimeSpan.FromSeconds(2))
select new { a1, b1, c1 };
但是我希望第二次和第三次调用仍然被执行,即使第一次调用有异常。
是否有一个Rx操作符,将组合序列,但只有OnException一旦它的所有序列完成?在功能上等同于以下代码:
using System;
using System.Collections.Generic;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Linq;
namespace ConsoleApplication4
{
public class Results
{
public int A { get; set; }
public int B { get; set; }
public string C { get; set; }
}
class Program
{
static void Main(string[] args)
{
new Program().Test();
}
public void Test()
{
GetResults().SubscribeOn(Scheduler.NewThread).Subscribe(
results => Console.WriteLine("{0} {1} {2}", results.A, results.B, results.C),
ex => Console.WriteLine(ex.ToString()),
() => Console.WriteLine("Completed")
);
Console.WriteLine("Not blocking");
Console.Read();
}
public IObservable<Results> GetResults()
{
return Observable.Create<Results>(obs =>
{
var a = Observable.Return(5).Delay(TimeSpan.FromSeconds(2));
var b = Observable.Throw<int>(new Exception("uh oh")).Delay(TimeSpan.FromSeconds(2));
var c = Observable.Return("7").Delay(TimeSpan.FromSeconds(2));
var results = new Results();
var exceptions = new List<Exception>();
try
{
results.A = a.FirstOrDefault();
}
catch (Exception ex)
{
exceptions.Add(ex);
}
try
{
results.B = b.FirstOrDefault();
}
catch (Exception ex)
{
exceptions.Add(ex);
}
try
{
results.C = c.FirstOrDefault();
}
catch (Exception ex)
{
exceptions.Add(ex);
}
obs.OnNext(results);
if (exceptions.Count > 0)
obs.OnError(new AggregateException(exceptions.ToArray()));
else
obs.OnCompleted();
return Disposable.Empty;
});
}
}
}
如何:
var result = Observable.Concat(
startObservable1().Catch(Observable.Return<TTheType>(null)),
startObservable2().Catch(Observable.Return<TTheType>(null)),
startObservable3().Catch(Observable.Return<TTheType>(null)));
是否有一个Rx操作符,将组合序列,但只有OnException一旦它的所有序列完成?
这部分有点难,我使用这个类,尽管我不是很满意:
public class Maybe<T>
{
public Exception Exception { get; protected set; }
T _Value;
public T Value {
get {
if (Exception != null) {
throw Exception;
}
return _Value;
}
protected set { _Value = value; }
}
public static Maybe<T> AsError(Exception ex)
{
return new Maybe<T>() {Value = default(T), Exception = ex};
}
public static Maybe<T> AsValue(T value)
{
return new Maybe<T>() {Value = value};
}
}
然后你可以这样做:
var result = Observable.Concat(
startObservable1().Select(x => Maybe.AsValue(x)).Catch<T1, Exception>(x => Maybe.AsError(x)),
startObservable2().Select(x => Maybe.AsValue(x)).Catch<T1, Exception>(x => Maybe.AsError(x)),
startObservable3().Select(x => Maybe.AsValue(x)).Catch<T1, Exception>(x => Maybe.AsError(x)));
您可以编写自己的Maybeify()扩展方法来隐藏Select+Catch