具有外部状态的 Rx

本文关键字:Rx 状态 外部 | 更新日期: 2023-09-27 18:30:56

Rx 与外部状态?

因此,在此示例中,Rx 功能与外部状态完整行为相结合。Rx实现这一目标的最佳方法是什么?

有问题的代码位置是"updateActive"。

public enum Source
{
    Observable1,
    Observable2,
}
// Type is Source.Observable1
IObservable<Source> observable1;
// Type is Source.Observable2
IObservable<Source> observable2;
var mergedObservables = observable1.Select(x => Source.Observable1)
    .Merge(observable2.Select(x => Source.Observable2));
var updateActive = false;
mergedObservables.Subscribe(x =>
{
    switch (x.Source)
    {
        case Source.Observable1:
        {
            if (updateActive)
                break;
            updateActive = true;
            // Here is some code which causes that observable2 will get some new values. 
            // (this coud be also on an other thread)
            // If this is the case, the new value(s) should be ignored.
            updateActive = false;
        }
        break;
        case Source.Observable2:
        {
            if (updateActive)
                break;
            updateActive = true;
            // Here is some code which causes that observable1 will get some new values. 
            // (this coud be also on an other thread)
            // If this is the case, the new value(s) should be ignored.
            updateActive = false;
        }
        break;
    }
});

备注:如何在 Rx 运算符中传输"updateActive"状态

具有外部状态的 Rx

您可以使用 Subject<bool> 实例作为状态运营商。

鉴于:

Subject<bool> isUpdating = new Subject<bool>();

您将在类似这样的事情中使用它:

var flaggedObservables = mergedObservables
    .CombineLatest(isUpdating, (obs, flag) => new {obs, IsUpdating = flag});
flaggedObservables
    .Where(data => !data.IsUpdating)
    .Select(data => data.obs)
    .DistinctUntilChanged()
    .Subscribe(
        obs =>
        {
            isUpdating.OnNext(true);
            //do some work on obs
            isUpdating.OnNext(false);
        });

如果你有一个代表更新的Task,这实际上可能会起作用(虽然我不知道我是否在这里正确捕获了你想要的语义,因为从你最初的帖子来看,你似乎忽略了源中的所有元素无论如何)。

public enum Source
{
    Observable1,
    Observable2,
}
// Type is Source.Observable1
IObservable<Source> observable1;
// Type is Source.Observable2
IObservable<Source> observable2;
var mergedObservables = observable1.Select(x => Source.Observable1)
    .Merge(observable2.Select(x => Source.Observable2));
mergedObservables 
    .Scan(
        new
        {
            Value = (Source?)null,
            CurrentUpdateTask = (Task)null
        },
        (tuple, value)
        {
            if ((tuple.CurrentUpdateTask == null) || (tuple.CurrentUpdateTask.IsCompleted))
            {
                // No update running. Start updating.
                return new
                {
                    Value = value,
                    CurrentUpdateTask = Update()  //Some Task-returning method that does the update.
                };
            }
            // Update in flight. Ignore value.
            return new
            {
                Value = (Source?)null,
                tuple.CurrentUpdateTask
            };
        })
    .Where(tuple => tuple.Value.HasValue)
    .Select(tuple.Value.Value)
    .Subscribe(...);

只需将Where子句添加到您的observable1中并observable2即可。 使用 System.Threading.Interlocked 确保将isActive的值传播到其他线程。 请注意,始终存在一个争用条件,即值可以同时到达两个可观察的上。 两者都最终会执行,尽管不是同时执行。 此代码仅停止在值为 true 时生成的值。

类型是源.可观察1 IO可观察1;

// Type is Source.Observable2
IObservable<Source> observable2;
int isActive = 0;
var mergedObservables = Observable.Merge(
    observable1
        .Where(t => Interlocked.CompareExchange(ref isActive, 1, 2) == 0)
        .Select(x => Source.Observable1),
    observable2
        .Where(t => Interlocked.CompareExchange(ref isActive, 1, 2) == 0)
        .Select(x => Source.Observable2));
mergedObservables.Subscribe(x =>
{
    switch (x.Source)
    {
        case Source.Observable1:
        {
            Interlocked.Exchange(ref isActive, 1);
            // Here is some code which causes that observable2 will get some new values. 
            // (this coud be also on an other thread)
            // If this is the case, the new value(s) should be ignored.
            Interlocked.Exchange(ref isActive, 0);
        }
        break;
        case Source.Observable2:
        {
            Interlocked.Exchange(ref isActive, 1);
            // Here is some code which causes that observable1 will get some new values. 
            // (this coud be also on an other thread)
            // If this is the case, the new value(s) should be ignored.
            Interlocked.Exchange(ref isActive, 0);
        }
        break;
    }
});