在Rx中使用Join时异常

本文关键字:Join 异常 Rx | 更新日期: 2023-09-27 18:05:25

在试图了解Rx引发的异常的底部时,我提出了以下reppro:

[Fact]
public void REPRO()
{
    // when true, window is open to accept requests
    var isWindowOpen = new BehaviorSubject<bool>(true);
    // when this ticks and the window is open, we want to execute the logic
    var request = new Subject<Unit>();
    // determines when the execution window opens
    var openWindow = isWindowOpen
        .Where(ce => ce);
    // determines when the execution window closes
    var closeWindow = isWindowOpen
        .Where(ce => !ce);
    var executionCount = 0;
    var result = Observable
        .Join(
            openWindow,
            request,
            _ => closeWindow,
            _ => Observable.Empty<Unit>(),
            (l, r) => Unit.Default)
        .Do(_ => ++executionCount)
        // changing isWindowOpen from false to true here is causing Rx to throw an exception
        .SelectMany(_ => Observable.Return(Unit.Default).Do(__ => isWindowOpen.OnNext(false)).Do(__ => isWindowOpen.OnNext(true)))
        .Subscribe();
    // this line causes Rx to throw an exception: System.InvalidOperationException: Collection was modified; enumeration operation may not execute.
    request.OnNext(Unit.Default);
    Assert.Equal(1, executionCount);
}

在指定的行上,抛出以下异常:

System.InvalidOperationException: Collection was modified; enumeration operation may not execute.
   at System.ThrowHelper.ThrowInvalidOperationException(ExceptionResource resource)
   at System.Collections.Generic.Dictionary`2.ValueCollection.Enumerator.MoveNext()
   at System.Reactive.Linq.ObservableImpl.Join`5._.RightObserver.OnNext(TRight value)
   at System.Reactive.Subjects.Subject`1.OnNext(T value)
   at RxJoinTest.Class1.InvokeCommandInvokesTheCommand() in c:'users'kent'documents'visual studio 2015'Projects'RxJoinTest'RxJoinTest'Class1.cs:line 57

注意注释行,它解释了只有当我在连接项的管道中修改isWindowOpen可观察对象时才会发生异常。

谁能解释这个(看似错误的)行为?或者,谁能提供另一种方法来实现我的目标,同时规避这个问题?

我的实际场景是这样的:窗口打开/关闭是一个命令,分别变为可用和不可执行。请求表示执行命令的愿望,而执行的逻辑(在SelectMany中)是命令逻辑本身。作为其内部逻辑的一部分,该命令禁用然后重新启用自己,这就是isWindowOpen最终在执行期间被修改的原因。

在Rx中使用Join时异常

您的Join代码中的SelectMany可以简化为这样,这更简单:

var result = Observable.Join(
        openWindow,
        request,
        _ => closeWindow,
        _ => Observable.Empty<Unit>(),
        (l, r) => Unit.Default
    )
    .Do(_ => ++executionCount)
    // The two lines from the SelectMany. You don't even need both; either one of these lines triggers the error
    .Do(__ => isWindowOpen.OnNext(false))
    .Do(__ => isWindowOpen.OnNext(true))
    .Subscribe();

简短的回答是,右窗口的打开不能触发左窗口的变化,左窗口的打开也不能触发右窗口的变化。

在内部,每当打开一个左窗口时,Rx遍历所有打开的右窗口并执行Join选择器。由于您的.Do 's在连接选择器之后立即同步执行,因此它们在righttwindowiterator仍然打开时发生,从而触发对集合RightWindowIterator迭代的修改,因此您的错误。您可以查看源代码来确认这一点。

这个限制是有意义的。如果您将右twindowclose函数从Observable.Empty<Unit>()更改为Observable.Never<Unit>(),而Rx没有此限制,则会出现无限循环。

就帮助你实现目标而言,你还没有真正地描述它们。在.Do之前添加延迟将导致它们异步运行,因此迭代器不会保持打开状态,这将使代码运行:

var result = openWindow.Join(
        request,
        _ => closeWindow,
        _ => Observable.Empty<Unit>(),
        (l, r) => Unit.Default
    )
    .Do(_ => ++executionCount)
    .Delay(TimeSpan.FromMilliseconds(1))
    .Do(__ => isWindowOpen.OnNext(false))
    .Do(__ => isWindowOpen.OnNext(true))
    .Subscribe();

…然而,这与可靠的健壮代码相去甚远。我建议尽可能地消除subject,特别是Join中围绕它们的循环逻辑。


编辑:

你可以这样做:

var result = request
    .WithLatestFrom(isWindowOpen, (_, windowOpen) => windowOpen)
    .Where(windowOpen => windowOpen)
    .Do(_ => ++executionCount)
    // changing isWindowOpen from false to true here is causing Rx to throw an exception
    .SelectMany(_ => Observable.Return(Unit.Default).Do(__ => isWindowOpen.OnNext(false)).Do(__ => isWindowOpen.OnNext(true)))
    .Subscribe();