在SubscribedOn被使用后,控制在哪个线程上处置Rx订阅

本文关键字:线程 订阅 Rx 控制 SubscribedOn | 更新日期: 2023-09-27 17:50:24

我有一个Rx订阅,我用不同的线程SubscribeOn来防止它阻塞。但是,我希望由于资源管理问题而阻止该订阅的处置。我还没能弄清楚如何在控制台应用程序或winforms应用程序的上下文中实现这一点(我有两个用例)。下面是一个简化案例的工作代码,它模拟了我正在做的事情:

internal class Program
{
    private static void Log(string msg)
    {
        Console.WriteLine("[{0}] " + msg, Thread.CurrentThread.ManagedThreadId.ToString());
    }
    private static void Main(string[] args)
    {
        var foo = Observable.Create<long>(obs =>
            {
                Log("Subscribing starting.. this will take a few seconds..");
                Thread.Sleep(TimeSpan.FromSeconds(2));
                var sub =
                    Observable.Interval(TimeSpan.FromSeconds(1))
                              .Do(_ => Log("I am polling..."))
                              .Subscribe(obs);
                return Disposable.Create(() =>
                    {
                        Thread.Sleep(TimeSpan.FromSeconds(3));
                        sub.Dispose();
                        Log("Disposing is really done now!");
                    });
            });
        Log("I am subscribing..");
        var disp = foo.SubscribeOn(NewThreadScheduler.Default).Subscribe(i => Log("Processing " + i.ToString()));
        Log("I have returned from subscribing...");
        // SC.Current is null in a ConsoleApp :/  Can I get a SC that uses my current thread?
        //var dispSynced = new ContextDisposable(SynchronizationContext.Current, disp);
        Thread.Sleep(TimeSpan.FromSeconds(5));
        Log("I'm going to dispose...");
        //dispSynced.Dispose();
        disp.Dispose();
        Log("Disposed has returned...");
        Console.ReadKey();
    }
}

当上面的代码运行时,我得到:

[10] I am subscribing..
[10] I have returned from subscribing...
[11] Subscribing starting.. this will take a few seconds..
[6] I am polling...
[6] Processing 0
[6] I am polling...
[6] Processing 1
[10] I'm going to dispose...
[10] Disposed has returned...
[13] I am polling...
[6] I am polling...
[13] I am polling...
[14] Disposing is really done now!

所以,我所要做的就是让[10] Disposed has returned...成为打印的最后一行,表明Dispose调用正在阻塞。

Rx附带的ContextDisposable似乎是我用例的理想选择,但我不知道如何获得代表我当前线程的SynchronizationContext。是否有一种方法,我可以使用ContextDisposable做我想做的,或者我需要一个完全不同的方法?

在SubscribedOn被使用后,控制在哪个线程上处置Rx订阅

如果查看SubscribeOn的源代码,就会发现处置函数似乎将在指定的调度程序上调度。试试这样做:

private static IObservable<long> GetObservable(IScheduler scheduler)
{
  return Observable.Create<long>(obs =>
  {
    var disposables = new CompositeDisposable();
    disposables.Add(
      Disposable.Create(() =>
      {
        Thread.Sleep(TimeSpan.FromSeconds(3));
        Log("Disposing is really done now!");
      }));
    disposables.Add(
      scheduler.Schedule(() =>
      {
        Log("Subscribing starting.. this will take a few seconds..");
        Thread.Sleep(TimeSpan.FromSeconds(2));
        disposables.Add(
          Observable.Interval(TimeSpan.FromSeconds(1)).Do(_ => Log("I am polling...")).Subscribe(obs));
      }));
    return disposables;
  });

private static void Main(string[] args)
{
  var foo = GetObservable(NewThreadScheduler.Default);
  Log("I am subscribing..");
  var disp = foo.Subscribe(i => Log("Processing " + i.ToString()));
  Log("I have returned from subscribing...");
  // SC.Current is null in a ConsoleApp :/  Can I get a SC that uses my current thread?
  //var dispSynced = new ContextDisposable(SynchronizationContext.Current, disp);
  Thread.Sleep(TimeSpan.FromSeconds(5));
  Log("I'm going to dispose...");
  //dispSynced.Dispose();
  disp.Dispose();
  Log("Disposed has returned...");
  Console.ReadKey();
}

(从头开始-还有另一种方法!)

这是一件有趣的事情——说实话,我以前从来没有需要过这个,但是有一个可观察的扩展Observable.Synchronize:

这使得阻塞相当琐碎,虽然我不能100%确定这将适用于您的用例…无论如何,下面是使用这种方法修改的main:

private static void Main(string[] args)
{
    var sync = new object();
    var foo = Observable.CreateWithDisposable<long>(obs =>
    {
        Log("Subscribing starting.. this will take a few seconds..");
        Thread.Sleep(TimeSpan.FromSeconds(2));
        var sub = Observable.Interval(TimeSpan.FromSeconds(1))
                        .Do(_ => Log("I am polling..."))
                        .Synchronize(sync)
                        .Subscribe(obs);
        return Disposable.Create(
            () =>
                {
                    lock (sync)
                    {
                        Thread.Sleep(TimeSpan.FromSeconds(3));
                        sub.Dispose();
                        Log("Disposing is really done now!");                                
                    }
                });
    });
    Log("I am subscribing..");
    var disp = foo
        .SubscribeOn(Scheduler.NewThread)
        .Subscribe(i => Log("Processing " + i));
    Log("I have returned from subscribing...");
    Thread.Sleep(TimeSpan.FromSeconds(5));
    Log("I'm going to dispose...");
    disp.Dispose();
    Log("Disposed has returned...");
    Console.ReadKey();
}