如果在下一个事件到达之前未完成选择,如何取消RX中的选择

本文关键字:选择 何取消 取消 RX 事件 下一个 如果 未完成 | 更新日期: 2023-09-27 18:24:48

我有以下设置

IObservable<Data> source = ...;
source
    .Select(data=>VeryExpensiveOperation(data))
    .Subscribe(data=>Console.WriteLine(data));

通常情况下,这些事件是在合理的时间范围内分开发生的。想象一下,一个用户正在更新表单中的文本框。我们的VeryExpensiveOperation可能需要5秒才能完成,而它需要一个小时显示在屏幕上。

但是,如果在5秒内用户再次更新文本框我想向当前VeryExpensiveOperation发送取消在新的开始之前。

我会想象一个类似的场景

source
    .SelectWithCancel((data, cancelToken)=>VeryExpensiveOperation(data, token))
    .Subscribe(data=>Console.WriteLine(data));

因此,每次调用lambda时都会调用一个cancelToken,它可以用于管理取消CCD_ 3。然而,现在我们正在混合Task、CancelationToken和RX。不太确定如何将其组合在一起。任何建议。

了解如何使用XUnit测试操作员的奖励积分:)

首次尝试

    public static IObservable<U> SelectWithCancelation<T, U>( this IObservable<T> This, Func<CancellationToken, T, Task<U>> fn )
    {
        CancellationTokenSource tokenSource = new CancellationTokenSource();
        return This
            .ObserveOn(Scheduler.Default)
            .Select(v=>{
                tokenSource.Cancel();
                tokenSource=new CancellationTokenSource();
                return new {tokenSource.Token, v};
            })
            .SelectMany(o=>Observable.FromAsync(()=>fn(o.Token, o.v)));
    }

尚未测试。我希望一个未完成的任务生成一个IOobservable,它在不触发任何OnNext事件的情况下完成。

如果在下一个事件到达之前未完成选择,如何取消RX中的选择

您必须将VeryExpensiveOperation建模为一个可取消的异步事物。CCD_ 6或CCD_。我假设这是一个带有CancellationToken:的任务

Task<TResult> VeryExpensiveOperationAsync<TSource, TResult>(TSource item, CancellationToken token);

然后你这样做:

source
    .Select(item => Observable.DeferAsync(async token =>
    {
        // do not yield the observable until after the operation is completed
        // (ie do not just do VeryExpensiveOperation(...).ToObservable())
        // because DeferAsync() will dispose of the token source as soon
        // as you provide the observable (instead of when the observable completes)
        var result = await VeryExpensiveOperationAsync(item, token);
        return Observable.Return(result);
    })
    .Switch();

Select只是创建了一个延迟的可观察对象,当订阅时,它将创建一个令牌并启动操作。如果在操作完成之前取消订阅可观察项,则令牌将被取消。

Switch订阅来自Select的每个新的可观测数据,取消订阅它订阅的前一个可观测数据。

这有你想要的效果。

附言:这很容易测试。只需提供一个模拟源和一个使用单元测试提供的TaskCompletetionSource的模拟VeryExpensiveOperation,这样单元测试就可以准确地控制何时生成新的源项以及何时完成任务。类似这样的东西:

void SomeTest()
{
    // create a test source where the values are how long
    // the mock operation should wait to do its work.
    var source = _testScheduler.CreateColdObservable<int>(...);
    // records the actions (whether they completed or canceled)
    List<bool> mockActionsCompleted = new List<bool>();
    var resultStream = source.SelectWithCancellation((token, delay) =>
    {
        var tcs = new TaskCompletionSource<string>();
        var tokenRegistration = new SingleAssignmentDisposable();
        // schedule an action to complete the task
        var d = _testScheduler.ScheduleRelative(delay, () =>
        {
           mockActionsCompleted.Add(true);
           tcs.SetResult("done " + delay);
           // stop listening to the token
           tokenRegistration.Dispose();
        });
        // listen to the token and cancel the task if the token signals
        tokenRegistration.Disposable = token.Register(() =>
        {
           mockActionsCompleted.Add(false);
           tcs.TrySetCancelled();
           // cancel the scheduled task
           d.Dispose();
        });
        return tcs.Task;
    });
    // subscribe to resultStream
    // start the scheduler
    // assert the mockActionsCompleted has the correct sequence
    // assert the results observed were what you expected.
}

由于动态安排了新操作,使用testScheduler.Start()时可能会遇到问题。使用CCD_ 15的while循环可能工作得更好。

为什么不直接使用Throttle?

http://rxwiki.wikidot.com/101samples#toc30

Throttle停止事件流,直到在指定的时间段内不再生成事件为止。例如,如果将文本框的TextChanged事件限制为.5秒,则在用户停止键入.5秒之前不会传递任何事件。这在搜索框中很有用,因为您不希望在每次击键后开始新的搜索,而是希望等待用户暂停。

SearchTextChangedObservable = Observable.FromEventPattern<TextChangedEventArgs>(this.textBox, "TextChanged");
_currentSubscription = SearchTextChangedObservable.Throttle(TimeSpan.FromSeconds(.5)).ObserveOnDispatcher