确保按顺序处理多个一次性用品

本文关键字:一次 性用品 顺序处理 确保 | 更新日期: 2023-09-27 17:50:38

我有两个需要按顺序处理的IDisposables。排序很重要,因为第一个IDisposable杀死了一个依赖于将被第二个IDisposable杀死的服务的Rx订阅。这是在Windows窗体应用程序中,IObservable的订阅需要发生在不同的线程上,但观察和处理需要发生在UI线程上。(实际上,只要顺序得到保证,我并不关心处理是否发生在UI线程上。)因此,在代码中,我大致有以下内容(一次简化):

SomeService = new DisposableService();
Subscription = Foo(someService).SubscribeOn(NewThreadScheduler.Default).ObserveOn(theForm).Subscribe(...)

对于许多UI事件,我需要按顺序处理这两个事件(Subscription,然后是SomeService)。为此,我尝试使用Rx的CompositeDisposableContextDisposable在同一线程上提供串行处理:

_Disposable = new CompositeDisposable(new[] {                     
    new ContextDisposable(WindowsFormsSynchronizationContext.Current, Subscription),                     
    new ContextDisposable(WindowsFormsSynchronizationContext.Current, SomeService)});
然而,上面的

不起作用。根据我的日志记录,_DisposableSomeServiceContextDisposable在同一个线程上被调用,但ContextDisposable仍然在与被处置的服务并发的不同线程上发生(从而导致竞争条件和npe)。

我只编程c#几个星期,所以我确定问题是我误解了上下文和调度程序的工作方式。解决这个问题的正确方法是什么?

确保按顺序处理多个一次性用品

除非我误解了什么,否则您可以控制哪个线程处理什么。谁订阅哪个线程并不重要。看看这个例子

internal class Program
{
    private static void Main(string[] args)
    {
        ReactiveTest rx1 = null;
        ReactiveTest rx2 = null;
        var thread1 = new Thread(() => rx1 = new ReactiveTest());
        var thread2 = new Thread(() => rx2 = new ReactiveTest());
        thread1.Start();
        thread2.Start();
        Thread.Sleep(TimeSpan.FromSeconds(1));
        thread1.Join();
        thread2.Join();
        rx1.Dispose();
        rx2.Dispose();
    }
}
public class ReactiveTest : IDisposable
{
    private IDisposable _timerObservable;
    private object _lock = new object();
    public ReactiveTest()
    {
        _timerObservable = Observable.Interval(TimeSpan.FromMilliseconds(250)).Subscribe(i => 
            Console.WriteLine("[{0}] - {1}", Thread.CurrentThread.ManagedThreadId, i));
    }
    public void Dispose()
    {
        lock (_lock)
        {
            _timerObservable.Dispose();
            Console.WriteLine("[{0}] - DISPOSING", Thread.CurrentThread.ManagedThreadId);
        }
    }
}

输出

[14] - 0
[7] - 0
[15] - 1
[7] - 1
[14] - 2
[15] - 2
[10] - DISPOSING
[10] - DISPOSING

你可以看到我们在两个独立的线程上订阅,然后在第三个线程上释放。我只是锁定了dispose,以防在订阅中需要发生线程安全的事情。

SubscribeOn调度对SubscribeDispose的调用。因此,在Subscription变量上调用Dispose,无论当前是否在UI线程上执行,都会导致订阅被NewThreadScheduler.Default调度处置。

使用SubscribeOn几乎从来不是个好主意;然而,在您的案例中,您声称它解决了50%的问题——这比我见过的大多数使用都多50%——所以我必须质疑您是否真的需要在后台线程上执行订阅。如果方法所做的只是开始一些异步工作,如发送网络请求或读取文件,那么创建一个全新的线程然后调用它的方法,与直接调用UI线程上的方法相比,代价是非常昂贵的。如果计算要发送的网络消息被证明过于耗时,那么使用SubscribeOn可能是正确的;虽然,当然,只有当您也希望安排处置时才会这样做。

如果对可观察对象的订阅必须在后台线程中执行,而dispose必须保持自由线程状态,则考虑使用以下操作符(未经测试)。

public static class ObservableExtensions
{
  public static IObservable<TSource> SubscribeOn<TSource>(
    this IObservable<TSource> source,
    bool doNotScheduleDisposal, 
    IScheduler scheduler)
  {
    if (!doNotScheduleDisposal)
    {
      return source.SubscribeOn(scheduler);
    }
    return Observable.Create<TSource>(observer =>
      {
        // Implementation is based on that of the native SubscribeOn operator in Rx
        var s = new SingleAssignmentDisposable();
        var d = new SerialDisposable();
        d.Disposable = s;
        s.Disposable = scheduler.Schedule(() =>
        {
          d.Disposable = source.SubscribeSafe(observer);
        });
        return d;
      });
  }
}