在Rx中使用Join时异常
本文关键字:Join 异常 Rx | 更新日期: 2023-09-27 18:05:25
在试图了解Rx引发的异常的底部时,我提出了以下reppro:
[Fact]
public void REPRO()
{
// when true, window is open to accept requests
var isWindowOpen = new BehaviorSubject<bool>(true);
// when this ticks and the window is open, we want to execute the logic
var request = new Subject<Unit>();
// determines when the execution window opens
var openWindow = isWindowOpen
.Where(ce => ce);
// determines when the execution window closes
var closeWindow = isWindowOpen
.Where(ce => !ce);
var executionCount = 0;
var result = Observable
.Join(
openWindow,
request,
_ => closeWindow,
_ => Observable.Empty<Unit>(),
(l, r) => Unit.Default)
.Do(_ => ++executionCount)
// changing isWindowOpen from false to true here is causing Rx to throw an exception
.SelectMany(_ => Observable.Return(Unit.Default).Do(__ => isWindowOpen.OnNext(false)).Do(__ => isWindowOpen.OnNext(true)))
.Subscribe();
// this line causes Rx to throw an exception: System.InvalidOperationException: Collection was modified; enumeration operation may not execute.
request.OnNext(Unit.Default);
Assert.Equal(1, executionCount);
}
在指定的行上,抛出以下异常:
System.InvalidOperationException: Collection was modified; enumeration operation may not execute.
at System.ThrowHelper.ThrowInvalidOperationException(ExceptionResource resource)
at System.Collections.Generic.Dictionary`2.ValueCollection.Enumerator.MoveNext()
at System.Reactive.Linq.ObservableImpl.Join`5._.RightObserver.OnNext(TRight value)
at System.Reactive.Subjects.Subject`1.OnNext(T value)
at RxJoinTest.Class1.InvokeCommandInvokesTheCommand() in c:'users'kent'documents'visual studio 2015'Projects'RxJoinTest'RxJoinTest'Class1.cs:line 57
注意注释行,它解释了只有当我在连接项的管道中修改isWindowOpen
可观察对象时才会发生异常。
谁能解释这个(看似错误的)行为?或者,谁能提供另一种方法来实现我的目标,同时规避这个问题?
我的实际场景是这样的:窗口打开/关闭是一个命令,分别变为可用和不可执行。请求表示执行命令的愿望,而执行的逻辑(在SelectMany
中)是命令逻辑本身。作为其内部逻辑的一部分,该命令禁用然后重新启用自己,这就是isWindowOpen
最终在执行期间被修改的原因。
您的Join
代码中的SelectMany
可以简化为这样,这更简单:
var result = Observable.Join(
openWindow,
request,
_ => closeWindow,
_ => Observable.Empty<Unit>(),
(l, r) => Unit.Default
)
.Do(_ => ++executionCount)
// The two lines from the SelectMany. You don't even need both; either one of these lines triggers the error
.Do(__ => isWindowOpen.OnNext(false))
.Do(__ => isWindowOpen.OnNext(true))
.Subscribe();
简短的回答是,右窗口的打开不能触发左窗口的变化,左窗口的打开也不能触发右窗口的变化。
在内部,每当打开一个左窗口时,Rx遍历所有打开的右窗口并执行Join选择器。由于您的.Do
's在连接选择器之后立即同步执行,因此它们在righttwindowiterator仍然打开时发生,从而触发对集合RightWindowIterator迭代的修改,因此您的错误。您可以查看源代码来确认这一点。
这个限制是有意义的。如果您将右twindowclose函数从Observable.Empty<Unit>()
更改为Observable.Never<Unit>()
,而Rx没有此限制,则会出现无限循环。
就帮助你实现目标而言,你还没有真正地描述它们。在.Do
之前添加延迟将导致它们异步运行,因此迭代器不会保持打开状态,这将使代码运行:
var result = openWindow.Join(
request,
_ => closeWindow,
_ => Observable.Empty<Unit>(),
(l, r) => Unit.Default
)
.Do(_ => ++executionCount)
.Delay(TimeSpan.FromMilliseconds(1))
.Do(__ => isWindowOpen.OnNext(false))
.Do(__ => isWindowOpen.OnNext(true))
.Subscribe();
…然而,这与可靠的健壮代码相去甚远。我建议尽可能地消除subject,特别是Join中围绕它们的循环逻辑。
编辑:
你可以这样做:
var result = request
.WithLatestFrom(isWindowOpen, (_, windowOpen) => windowOpen)
.Where(windowOpen => windowOpen)
.Do(_ => ++executionCount)
// changing isWindowOpen from false to true here is causing Rx to throw an exception
.SelectMany(_ => Observable.Return(Unit.Default).Do(__ => isWindowOpen.OnNext(false)).Do(__ => isWindowOpen.OnNext(true)))
.Subscribe();