如何在IObservable管道中保存异常,并在结束时重新抛出它

本文关键字:结束 新抛出 异常 IObservable 管道 保存 | 更新日期: 2023-09-27 18:05:43

我有以下方法:

public IObservable<DataManagementWorkItem> GetWorkItemSource(int maxConcurrentCalls)
{
    return m_namespaceManager
        .GetNamespaceConnectionInfoSource(true, drainAndDisable: false)
        .Select(nci => Observable.Defer(() => GetPolicySourceForNamespace(nci)))
        .Merge(maxConcurrentCalls)
        .Where(IsValid)
        .Select(ToWorkItem)
        .Where(o => o != null);
}

实现了以下逻辑:

  1. 通过从命名空间管理器(GetNamespaceConnectionInfoSource)获取IObservable<NamespaceConnectionInfo>进入monad。
  2. 当命名空间可用时,获取特定命名空间(GetPolicySourceForNamespace)对应的IObservable<DataManagementPolicy>。但是,可以使用Merge操作符来限制GetPolicySourceForNamespace的并发呼叫数。
  3. 过滤掉坏的DataManagementPolicy记录(不能在SQL中完成)。
  4. 将看似良好的DataManagementPolicy记录转换为DataManagementWorkItem实例。有些可能会变成null,所以它们在最后被过滤掉。

GetNamespaceConnectionInfoSource在产生一定数量的有效NamespaceConnectionInfo对象后会发生故障。完全有可能在最后的可观测序列中已经产生了一定数量的DataManagementWorkItem天体。

我有一个单元测试,其中:
  • GetNamespaceConnectionInfoSource在生成25个命名空间后抛出
  • GetPolicySourceForNamespace在每个命名空间生成10个对象
  • 并发限制是10

我也有兴趣在最终可观察对象出现故障之前检查它产生的项目:

var dm = DependencyResolver.Instance.GetInstance<IDataManagement>();
var workItems = new List<DataManagementWorkItem>();
try
{
    var obs = dm.GetWorkItemSource(10);
    obs.Subscribe(wi => workItems.Add(wi));
    await obs;
    Assert.Fail("An expected exception was not thrown");
}
catch (Exception exc)
{
    AssertTheRightException(exc);
}

workItems集合每次都有不同数量的项目。有一次它有69项,另一次有50项,再一次有18项。

我的解释是,当故障发生时,在处理的各个阶段都有良好的NamespaceConnectionInfoDataManagementPolicy对象,所有这些对象都因为故障而终止。每次的数量是不同的,因为项目是异步生产的。

我的问题就在这里——我不想让他们流产。我希望它们运行到完成,在最终的可观察序列中产生,然后才能通信故障。实际上,我想保留异常并在结束时重新抛出它。

我试着稍微修改一下实现:

public IObservable<DataManagementWorkItem> GetWorkItemSource(int maxConcurrentCalls)
{
    Exception fault = null;
    return m_namespaceManager
        .GetNamespaceConnectionInfoSource(true, drainAndDisable: false)
        .Catch<NamespaceConnectionInfo, Exception>(exc =>
        {
            fault = exc;
            return Observable.Empty<NamespaceConnectionInfo>();
        })
        .Select(nci => Observable.Defer(() => GetPolicySourceForNamespace(nci)))
        .Merge(maxConcurrentCalls)
        .Where(IsValid)
        .Select(ToWorkItem)
        .Where(o => o != null)
        .Finally(() =>
        {
            if (fault != null)
            {
                throw fault;
            }
        });
}

不用说,它没有工作。Finally似乎没有传播任何异常,我实际上同意。

那么,什么是实现我想要的正确方法呢?

编辑

与这个问题无关,我发现我用来收集生成的DataManagementWorkItem实例的测试代码很糟糕。而不是

    var obs = dm.GetWorkItemSource(10);
    obs.Subscribe(wi => workItems.Add(wi));
    await obs;

应该是

    await dm.GetWorkItemSource(1).Do(wi => workItems.Add(wi));

不同之处在于,后者只订阅了一次条目源,而原始版本订阅了两次:

  1. /Subscribe
  2. by await

它不影响问题,但破坏了我的嘲弄代码。

这更像是一个澄清。每个名称空间产生一个包含10个策略对象的序列。但是这个过程是异步的——策略对象是按顺序异步生成的。在这段时间内,命名空间继续产生,因此在故障发生前给定25个命名空间,产生的命名空间可能有三种可能的"状态":

  • 还没有为它生成策略对象,但是异步策略生成过程已经启动
  • 一些(但少于10个)策略对象已经产生
  • 名称空间的所有10个策略对象都已生成

当命名空间生成过程中出现错误时,不管"好的"命名空间现在处于什么"状态",整个管道都会被终止。

让我们看看下面这个简单的例子:

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading;
namespace observables
{
    class Program
    {
        static void Main()
        {
            int count = 0;
            var obs = Observable
                .Interval(TimeSpan.FromMilliseconds(1))
                .Take(50)
                .Select(i =>
                {
                    if (25 == Interlocked.Increment(ref count))
                    {
                        throw new Exception("Boom!");
                    }
                    return i;
                })
                .Select(i => Observable.Defer(() => Observable.Interval(TimeSpan.FromMilliseconds(1)).Take(10).Select(j => i * 1000 + j)))
                .Merge(10);
            var items = new HashSet<long>();
            try
            {
                obs.Do(i => items.Add(i)).GetAwaiter().GetResult();
            }
            catch (Exception exc)
            {
                Debug.WriteLine(exc.Message);
            }
            Debug.WriteLine(items.Count);
        }
    }
}

当我运行它时,我通常有以下输出:

Boom!
192

但是,它也可以显示191。然而,如果我们应用故障连接解决方案(即使它在没有故障的情况下不起作用):

        int count = 0;
        var fault = new Subject<long>();
        var obs = Observable
            .Interval(TimeSpan.FromMilliseconds(1))
            .Take(50)
            .Select(i =>
            {
                if (25 == Interlocked.Increment(ref count))
                {
                    throw new Exception("Boom!");
                }
                return i;
            })
            .Catch<long, Exception>(exc =>
            {
                fault.OnError(exc);
                return Observable.Empty<long>();
            })
            .Select(i => Observable.Defer(() => Observable.Interval(TimeSpan.FromMilliseconds(1)).Take(10).Select(j => i * 1000 + j)))
            .Merge(10)
            .Concat(fault);

那么输出始终是240,因为我们让所有已经启动的异步进程完成。

基于pmccloghrylaing

    public IObservable<DataManagementWorkItem> GetWorkItemSource(int maxConcurrentCalls)
    {
        var fault = new Subject<DataManagementWorkItem>();
        bool faulted = false;
        return m_namespaceManager
            .GetNamespaceConnectionInfoSource(true, drainAndDisable: false)
            .Catch<NamespaceConnectionInfo, Exception>(exc =>
            {
                faulted = true;
                return Observable.Throw<NamespaceConnectionInfo>(exc);
            })
            .Finally(() =>
            {
                if (!faulted)
                {
                    fault.OnCompleted();
                }
            })
            .Catch<NamespaceConnectionInfo, Exception>(exc =>
            {
                fault.OnError(exc);
                return Observable.Empty<NamespaceConnectionInfo>();
            })
            .Select(nci => Observable.Defer(() => GetPolicySourceForNamespace(nci)))
            .Merge(maxConcurrentCalls)
            .Where(IsValid)
            .Select(ToWorkItem)
            .Where(o => o != null)
            .Concat(fault);
    }

它在命名空间产生错误和成功时都可以工作,但是看起来很尴尬。加上多个订阅仍然共享故障。一定有一个更优雅的解决方案。

GetNamespaceConnectionInfoSource源代码

public IObservable<NamespaceConnectionInfo> GetNamespaceConnectionInfoSource(bool? isActive = null,
    bool? isWorkflowEnabled = null, bool? isScheduleEnabled = null, bool? drainAndDisable = null,
    IEnumerable<string> nsList = null, string @where = null, IList<SqlParameter> whereParameters = null)
{
    IList<SqlParameter> parameters;
    var sql = GetNamespaceConnectionInfoSqls.GetSql(isActive,
        isWorkflowEnabled, isScheduleEnabled, drainAndDisable, nsList, @where, whereParameters, out parameters);
    var sqlUtil = m_sqlUtilProvider.Get(m_siteSettings.ControlDatabaseConnString);
    return sqlUtil.GetSource(typeof(NamespaceConnectionInfo), sqlUtil.GetReaderAsync(sql, parameters)).Cast<NamespaceConnectionInfo>();
}
public IObservable<DbDataReader> GetReaderAsync(string query, IList<SqlParameter> parameters = null, CommandBehavior commandBehavior = CommandBehavior.Default)
{
    return Observable.FromAsync(async () =>
    {
        SqlCommand command = null;
        try
        {
            var conn = await GetConnectionAsync();
            command = GetCommand(conn, query, parameters);
            return (DbDataReader)await command.ExecuteReaderAsync(commandBehavior | CommandBehavior.CloseConnection);
        }
        finally
        {
            DisposeSilently(command);
        }
    });
}
public IObservable<object> GetSource(Type objectType, IObservable<DbDataReader> readerTask)
{
    return Observable.Create<object>(async (obs, ct) => await PopulateSource(objectType, await readerTask, true, obs, ct));
}
private static async Task PopulateSource(Type objectType, DbDataReader reader, bool disposeReader, IObserver<object> obs, CancellationToken ct)
{
    try
    {
        if (IsPrimitiveDataType(objectType))
        {
            while (await reader.ReadAsync(ct))
            {
                obs.OnNext(reader[0]);
            }
        }
        else
        {
            // Get all the properties in our Object
            var typeReflector = objectType.GetTypeReflector(TypeReflectorCreationStrategy.PREPARE_DATA_RECORD_CONSTRUCTOR);
            // For each property get the data from the reader to the object
            while (await reader.ReadAsync(ct))
            {
                obs.OnNext(typeReflector.DataRecordConstructor == null ?
                    ReadNextObject(typeReflector, reader) :
                    typeReflector.DataRecordConstructor(reader));
            }
        }
    }
    catch (OperationCanceledException)
    {
    }
    finally
    {
        if (disposeReader)
        {
            reader.Dispose();
        }
    }
}

如何在IObservable管道中保存异常,并在结束时重新抛出它

调用m_namespaceManager.GetNamespaceConnectionInfoSource(true, drainAndDisable: false)返回一个IObservable<NamespaceConnectionInfo>。现在,任何单个可观察对象的契约是这样的:

OnNext*(OnError|OnCompleted)

这意味着您将得到零个或多个值,后面跟着一个,并且只有一个错误值或补全值。

你不能从一个可观察对象中获得多个错误,也不能在获得错误后获得值。

如果你的可观察对象没有返回一个以上的错误,它就违反了正常的Rx契约。

因此,根据现有的代码,你不可能将错误延迟到可观察对象的结束,因为错误在可观察对象的结束。

你能做的是改变你在GetNamespaceConnectionInfoSource中产生你的值的方式,这样它在合并它们之前会生成多个序列调用.Materialize()。这意味着您将有一个IObservable<Notification<NamespaceConnectionInfo>>,并且在整个流中可能有多个错误和完成。然后,您可以对该流进行分组,并在处理错误之前处理这些值。但这一切都取决于GetNamespaceConnectionInfoSource的变化,因为你还没有发布源代码,我不能给你正确的代码。

为了帮助理解这一点,请看下面的代码:
var xs = new [] { 1, 2, 3, 0, 4, 0, 5 }.ToObservable();
xs
    .Select(x =>
    {
        if (x == 0)
            throw new NotSupportedException();
        else
            return x;
    })
    .Subscribe(
        x => Console.WriteLine(x),
        ex => Console.WriteLine(ex.ToString()));

结果如下:

1
2
3
System.NotSupportedException: Specified method is not supported.
   at UserQuery.<Main>b__0(Int32 x) in query_ioaahp.cs:line 45
   at System.Reactive.Linq.ObservableImpl.Select`2._.OnNext(TSource value)

4 &5根本不会产生。

现在看看这段代码:

xs
    .Select(x =>
        Observable
            .Start(() =>
            {
                if (x == 0)
                    throw new NotSupportedException();
                else
                    return x;
            })
            .Materialize())
    .Merge()
    .Where(x => x.Kind != NotificationKind.OnCompleted)
    .Subscribe(
        x => Console.WriteLine(String.Format(
            "{0} {1}",
            x.Kind,
            x.HasValue ? x.Value.ToString() : "")),
        ex => Console.WriteLine(ex.ToString()));

生成以下内容:

OnNext 1
OnNext 4
OnError 
OnError 
OnNext 5
OnNext 3
OnNext 2

由于引入了并行性,它是无序的。

但是现在你可以处理所有的错误。

Concat能解决你的问题吗?我已经用Finally将其包装在Observable.Create中,以完成faults主题。

public IObservable<DataManagementWorkItem> GetWorkItemSource(int maxConcurrentCalls)
{
    return Observable.Create<DataManagementWorkItem>((observer) =>
    {
        var faults = new Subject<DataManagementWorkItem>();
        return m_namespaceManager
            .GetNamespaceConnectionInfoSource(true, drainAndDisable: false)
            .Catch<NamespaceConnectionInfo, Exception>(exc =>
            {
                faults.OnError(exc);
                return Observable.Empty<NamespaceConnectionInfo>();
            })
            .Take(maxConcurrentCalls)
            .Select(nci => GetPolicySourceForNamespace(nci))
            .Merge()
            .Where(IsValid)
            .Select(ToWorkItem)
            .Where(o => o != null)
            .Finally(() => faults.OnCompleted())
            .Concat(faults)
            .Subscribe(observer);
    });
}

同样,这是否返回您所期望的?(24在你的测试)

m_namespaceManager
    .GetNamespaceConnectionInfoSource(true, drainAndDisable: false)
    .Catch<NamespaceConnectionInfo, Exception>(exc =>
    {
        faults.OnError(exc);
        return Observable.Empty<NamespaceConnectionInfo>();
    })
    .Count()

是的,基本问题是Merge有一个失败的快速实现。如果源可观察对象产生错误,或者任何一个内部可观察对象产生错误,那么Merge将使流失败,而不等待剩余的内部可观察对象完成。

要实现你想要的,你需要在merge看到错误之前"捕获"它,并在内部可观察对象完成后"重新抛出"它:

public IObservable<DataManagementWorkItem> GetWorkItemSource(int maxConcurrentCalls)
{
    // wrap within Observable.Defer
    // so that each new subscription
    // gets its own Error subject
    return Observable.Defer(() =>
    {
        var error = new ReplaySubject<DataManagementWorkItem>(1);
        return m_namespaceManager
            .GetNamespaceConnectionInfoSource(true, drainAndDisable: false)
            .Catch(err =>
            {
                error.OnError(err);
                return Observable.Empty<NamespaceConnectionInfo>();
            })
            .Finally(error.OnCompleted)
            .Select(nci => Observable.Defer(() => GetPolicySourceForNamespace(nci)))
            .Merge(maxConcurrentCalls)
            .Where(IsValid)
            .Select(ToWorkItem)
            .Where(o => o != null)
            .Concat(error);
    });
}

另外,我注意到单元测试将两次订阅到返回的可观察对象,这增加了您的困惑。一次是调用Subscribe来填充列表,另一次是调用await。你真的只想订阅一次。我们可以使用.Do运算符来填充列表,并且应该能够在错误处理程序中检查它:

var dm = DependencyResolver.Instance.GetInstance<IDataManagement>();
var workItems = new List<DataManagementWorkItem>();
try
{
    var obs = dm.GetWorkItemSource(10).Do(workItems.Add);
    await obs;
    Assert.Fail("An expected exception was not thrown");
}
catch (Exception exc)
{
    AssertTheRightException(exc);
    // workItems should be populated.
}