如何取消一个可观察序列

本文关键字:观察 一个 何取消 取消 | 更新日期: 2023-09-27 18:03:06

我有一个非常简单的IObservable<int>,每500ms作为脉冲发生器:

var pulses = Observable.GenerateWithTime(0, i => true, i => i + 1, i => i,
                                         i => TimeSpan.FromMilliseconds(500))

我有一个CancellationTokenSource(用于取消同时进行的其他工作)。

我如何使用取消令牌源取消我的可观察序列?

如何取消一个可观察序列

这是一个老线程,但只是为了将来参考,这里有一个更简单的方法。

如果你有一个CancellationToken,你可能已经在处理任务了。因此,只需将其转换为Task并让框架执行绑定:

using System.Reactive.Threading.Tasks;
...
var task = myObservable.ToTask(cancellationToken);

这将创建一个内部订阅者,该订阅者将在任务取消时被处置。这在大多数情况下都行得通,因为大多数可观察对象只有在有订阅者的情况下才会产生值。

现在,如果你有一个实际的可观察对象,由于某种原因需要被处理(可能是一个热门的可观察对象,如果父任务被取消了,它就不再重要了),这可以通过continuation来实现:

disposableObservable.ToTask(cancellationToken).ContinueWith(t => {
    if (t.Status == TaskStatus.Canceled)
        disposableObservable.Dispose();
});

如果您正在使用GenerateWithTime(现在由在时间跨度函数重载中传递的Generate代替),您可以替换第二个参数来计算取消令牌的状态,如下所示:

var pulses = Observable.Generate(0,
    i => !ts.IsCancellationRequested,
    i => i + 1,
    i => i,
    i => TimeSpan.FromMilliseconds(500));

或者,如果导致取消令牌被设置的事件可以转换为可观察对象本身,你可以使用如下内容:

pulses.TakeUntil(CancelRequested);

我也在http://www.thinqlinq.com/Post.aspx/Title/Cancelling-a-Reactive-Extensions-Observable上发表了更详细的解释

这里有两个方便的操作符来取消可观察序列。它们之间的区别在于如果取消会发生什么。TakeUntil导致序列正常完成(OnCompleted),而WithCancellation导致异常终止(OnError)。

/// <summary>Returns the elements from the source observable sequence until the
/// CancellationToken is canceled.</summary>
public static IObservable<TSource> TakeUntil<TSource>(
    this IObservable<TSource> source, CancellationToken cancellationToken)
{
    return source
        .TakeUntil(Observable.Create<Unit>(observer =>
            cancellationToken.Register(() => observer.OnNext(default))));
}
/// <summary>Ties a CancellationToken to an observable sequence. In case of
/// cancellation propagates an OperationCanceledException to the observer.</summary>
public static IObservable<TSource> WithCancellation<TSource>(
    this IObservable<TSource> source, CancellationToken cancellationToken)
{
    return source
        .TakeUntil(Observable.Create<Unit>(o => cancellationToken.Register(() =>
            o.OnError(new OperationCanceledException(cancellationToken)))));
}

使用例子:

var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
var pulses = Observable
    .Generate(0, i => true, i => i + 1, i => i, i => TimeSpan.FromMilliseconds(500))
    .WithCancellation(cts.Token);

注意:在取消的情况下,上面给出的自定义操作符会立即从底层可观察对象中取消订阅。如果观察到的包括副作用,这是需要考虑的。将TakeUntil(cts.Token)放在执行副作用的操作符之前,将推迟整个可观察对象的完成,直到副作用完成(优雅终止)。将其放在副作用之后将使取消成为即时的,从而可能导致任何正在运行的代码以一种即发即弃的方式继续运行而不被发现。

您可以使用以下代码段将IObservable订阅与CancellationTokenSource连接起来

var pulses = Observable.GenerateWithTime(0,
    i => true, i => i + 1, i => i,
    i => TimeSpan.FromMilliseconds(500));
// Get your CancellationTokenSource
CancellationTokenSource ts = ...
// Subscribe
ts.Token.Register(pulses.Subscribe(...).Dispose);

订阅后返回一个IDisposable实例。呼叫Dispose()