具有外部状态的 Rx
本文关键字:Rx 状态 外部 | 更新日期: 2023-09-27 18:30:56
Rx 与外部状态?
因此,在此示例中,Rx 功能与外部状态完整行为相结合。Rx实现这一目标的最佳方法是什么?
有问题的代码位置是"updateActive"。
public enum Source
{
Observable1,
Observable2,
}
// Type is Source.Observable1
IObservable<Source> observable1;
// Type is Source.Observable2
IObservable<Source> observable2;
var mergedObservables = observable1.Select(x => Source.Observable1)
.Merge(observable2.Select(x => Source.Observable2));
var updateActive = false;
mergedObservables.Subscribe(x =>
{
switch (x.Source)
{
case Source.Observable1:
{
if (updateActive)
break;
updateActive = true;
// Here is some code which causes that observable2 will get some new values.
// (this coud be also on an other thread)
// If this is the case, the new value(s) should be ignored.
updateActive = false;
}
break;
case Source.Observable2:
{
if (updateActive)
break;
updateActive = true;
// Here is some code which causes that observable1 will get some new values.
// (this coud be also on an other thread)
// If this is the case, the new value(s) should be ignored.
updateActive = false;
}
break;
}
});
备注:如何在 Rx 运算符中传输"updateActive"状态
您可以使用 Subject<bool>
实例作为状态运营商。
鉴于:
Subject<bool> isUpdating = new Subject<bool>();
您将在类似这样的事情中使用它:
var flaggedObservables = mergedObservables
.CombineLatest(isUpdating, (obs, flag) => new {obs, IsUpdating = flag});
flaggedObservables
.Where(data => !data.IsUpdating)
.Select(data => data.obs)
.DistinctUntilChanged()
.Subscribe(
obs =>
{
isUpdating.OnNext(true);
//do some work on obs
isUpdating.OnNext(false);
});
如果你有一个代表更新的Task
,这实际上可能会起作用(虽然我不知道我是否在这里正确捕获了你想要的语义,因为从你最初的帖子来看,你似乎忽略了源中的所有元素无论如何)。
public enum Source
{
Observable1,
Observable2,
}
// Type is Source.Observable1
IObservable<Source> observable1;
// Type is Source.Observable2
IObservable<Source> observable2;
var mergedObservables = observable1.Select(x => Source.Observable1)
.Merge(observable2.Select(x => Source.Observable2));
mergedObservables
.Scan(
new
{
Value = (Source?)null,
CurrentUpdateTask = (Task)null
},
(tuple, value)
{
if ((tuple.CurrentUpdateTask == null) || (tuple.CurrentUpdateTask.IsCompleted))
{
// No update running. Start updating.
return new
{
Value = value,
CurrentUpdateTask = Update() //Some Task-returning method that does the update.
};
}
// Update in flight. Ignore value.
return new
{
Value = (Source?)null,
tuple.CurrentUpdateTask
};
})
.Where(tuple => tuple.Value.HasValue)
.Select(tuple.Value.Value)
.Subscribe(...);
只需将Where
子句添加到您的observable1
中并observable2
即可。 使用 System.Threading.Interlocked
确保将isActive
的值传播到其他线程。 请注意,始终存在一个争用条件,即值可以同时到达两个可观察的上。 两者都最终会执行,尽管不是同时执行。 此代码仅停止在值为 true 时生成的值。
类型是源.可观察1 IO可观察1;
// Type is Source.Observable2
IObservable<Source> observable2;
int isActive = 0;
var mergedObservables = Observable.Merge(
observable1
.Where(t => Interlocked.CompareExchange(ref isActive, 1, 2) == 0)
.Select(x => Source.Observable1),
observable2
.Where(t => Interlocked.CompareExchange(ref isActive, 1, 2) == 0)
.Select(x => Source.Observable2));
mergedObservables.Subscribe(x =>
{
switch (x.Source)
{
case Source.Observable1:
{
Interlocked.Exchange(ref isActive, 1);
// Here is some code which causes that observable2 will get some new values.
// (this coud be also on an other thread)
// If this is the case, the new value(s) should be ignored.
Interlocked.Exchange(ref isActive, 0);
}
break;
case Source.Observable2:
{
Interlocked.Exchange(ref isActive, 1);
// Here is some code which causes that observable1 will get some new values.
// (this coud be also on an other thread)
// If this is the case, the new value(s) should be ignored.
Interlocked.Exchange(ref isActive, 0);
}
break;
}
});