如何将异常转换为事件,并重新订阅有故障的IObservable

本文关键字:新订阅 故障 IObservable 异常 转换 事件 | 更新日期: 2023-09-27 18:17:15

如何将IObservable流中的异常转换为普通域对象并透明地重新订阅流?

附录:正如James在评论中指出的,我的用例想法类似于在一个不可靠的源(例如网络)上拥有一个应该是连续的流。如果出现故障,请尝试重新连接到源,但要通知下游处理器。

事实上,这与我在将一段异步c#代码转换为f#(使用响应式扩展和FSharpx)时遇到的另一个问题有关,这反过来又源于如何使用可观察对象实现轮询?

事实上,现在我想到了,我可以首先使用如何在f#中编写泛型递归扩展方法?("RetryAfterDelay")(使用更多参数来调整RetryAfterDelay行为)并将其与此实现链接。当重试次数用尽时,将产生域错误,并重新启动轮询器。当然,可能会有更有效的方法,但不管怎样。:)……或者只提供一个回调函数来记录错误,而不是将它们转换为域事件,嗯,选择很多…

但是回到原始代码…

例如,如果我有

public enum EventTypeEnum
{
    None    = 0,
    Normal  = 1,
    Faulted = 2
}
public class Event
{
    public EventTypeEnum Type { get; set; }
} 
private static IObservable<int> FaultingSequence1()
{
    var subject = new ReplaySubject<int>();
    subject.OnNext(1);
    subject.OnNext(2);
    subject.OnError(new InvalidOperationException("Something went wrong!"));
    return subject;
}
private static IEnumerable<int> FaultingSequence2()
{                           
    for(int i = 0; i < 3; ++i)
    {
        yield return 1;
    }
    throw new InvalidOperationException("Something went wrong!");
}
//Additional pondering: Why isn't FaultingSequence2().ToObservable() too be procted by Catch?
//
//This part is for illustratory purposes here. This is the piece I'd like
//behave so that exceptions would get transformed to Events with EventTypeEnum.Faulted
//and passed along to the stream that's been subscribed to while resubscribing to 
//FaultingSequence1. That is, the subscribed would learn about the fault through a
//domain event type.
//Retry does the resubscribing, but only on OnError.
var stream = FaultingSequence1().Catch<int, Exception>(ex =>
{
    Console.WriteLine("Exception: {0}", ex);
    return Observable.Throw<int>(ex);
}).Retry().Select(i => new Event { Type = EventTypeEnum.Normal });
//How to get this to print "Event type: Normal", "Event type: Normal", "Event type: Faulted"?
stream.Subscribe(i => Console.WriteLine("Event type: {0}", i.Type));
这个问题现在真的把我难住了!任何建议吗?

如何将异常转换为事件,并重新订阅有故障的IObservable

有一个叫做Materialize的运算符,它将每个事件转换成Notification<T>:

OnNext:
    OnNext a Notification<T> with Kind OnNext containing a value.
OnError:
    OnNext a Notification<T> with Kind OnError containing an exception.
    OnCompleted.
OnCompleted:
    OnNext a Notification<T> with Kind OnCompleted
    OnCompleted.

所以当OnError或OnCompleted被调用时订阅仍然完成,但是OnError永远不会在订阅服务器上被调用。所以你可以这样做…

source
    .Materialize()
    .Repeat();
然而,这将重新订阅源,即使原始订阅自然完成(通过OnCompleted)

所以也许你仍然希望OnError被调用,但你也希望从原来的OnError异常通过Notification<T>内部的OnNext传递。为此,您可以使用如下命令:

source
    .Materialize()
    .SelectMany(notification => 
        notification.Kind == NotificationKind.OnError
            ? Observable.Return(notification).Concat(Observable.Exception(notification.Exception))
            : Observable.Return(notification)
    )
    .Retry();

通过这种方式,如果订阅自然完成(通过OnCompleted),那么源将不会被重新订阅。

一旦设置好了,就可以将每种类型的通知映射到您想要使用的任何域对象:

source
    .Materialize()
    .SelectMany(notification => 
        notification.Kind == NotificationKind.OnError
            ? Observable.Return(notification).Concat(Observable.Exception(notification.Exception))
            : Observable.Return(notification)
    )
    .Retry()
    .Map(notification => {
        switch (notification.Kind) {
            case (NotificationKind.OnNext):      return // something.
            case (NotificationKind.OnError):     return // something.
            case (NotificationKind.OnCompleted): return // something.
            default: throw new NotImplementedException();
        }
    });