可观察的取消令牌

本文关键字:令牌 取消 观察 | 更新日期: 2023-09-27 18:34:17

如果以下类型的

Rx 可观察量是在单击开始按钮时创建的,即从停止按钮创建,我如何取消以下类型的 Rx 可观察量。

var instance = ThreadPoolScheduler.Instance; 
Observable.Interval(TimeSpan.FromSeconds(2), instance)
                     .Subscribe(_ =>
                     {
                     Console.WriteLine(DateTime.Now); // dummy event
                     }
                     );         

可观察的取消令牌

只需使用需要CancellationTokenSubscribe重载之一:

observable.Subscribe(_ => Console.WriteLine(DateTime.UtcNow), cancellationToken);

这简化了Jon Skeet的例子:

using System;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Threading;
class Program
{
    static void Main(string[] args)
    {
        var instance = ThreadPoolScheduler.Instance;
        var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
        Observable.Interval(TimeSpan.FromSeconds(0.5), instance)
            .Subscribe(_ => Console.WriteLine(DateTime.UtcNow), cts.Token);
        Thread.Sleep(10000);
    }
}

保留 Subscribe 返回的IDisposable,并对其调用Dispose

很可能有一种方法可以将基于 Rx IDisposable 的取消订阅与开箱即用的CancellationToken集成,但只需调用Dispose将是一个开始。(您始终可以使用取消令牌注册延续以调用 dispose...

下面是一个简短但完整的示例来演示:

using System;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Threading;
class Program
{
    static void Main(string[] args)
    {
        var instance = ThreadPoolScheduler.Instance;
        var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
        var disposable = Observable
            .Interval(TimeSpan.FromSeconds(0.5), instance)
            .Subscribe(_ => Console.WriteLine(DateTime.UtcNow));
        cts.Token.Register(() => disposable.Dispose());
        Thread.Sleep(10000);
    }
}