使用SubscribeOn(Scheduler.TaskPool)时避免Rx中的OnNext调用重叠

本文关键字:Rx 中的 OnNext 重叠 调用 SubscribeOn Scheduler TaskPool 使用 | 更新日期: 2023-09-27 18:22:34

我有一些使用Rx的代码,从多个执行以下操作的线程调用:

subject.OnNext(value); // where subject is Subject<T>

我想在后台处理这些值,所以我的订阅是

subscription = subject.ObserveOn(Scheduler.TaskPool).Subscribe(value =>
{
    // use value
});

我真的不在乎哪些线程处理来自Observable的值,只要工作被放入TaskPool并且不阻塞当前线程。但是,我在OnNext委托中使用的"value"不是线程安全的。目前,如果有很多值通过Observable,我会得到对我的OnNext处理程序的重叠调用。

我可以给我的OnNext代理添加一个锁,但这感觉不像Rx做事的方式。当我有多个线程调用subject.OnNext(value);时,确保一次只能调用一个OnNext处理程序的最佳方法是什么?

使用SubscribeOn(Scheduler.TaskPool)时避免Rx中的OnNext调用重叠

在MSDN 上使用主题

默认情况下,受试者不会在螺纹。[…]但是,如果使用调度程序同步对观察者的传出呼叫,您可以使用Synchronize方法。

因此,正如Brandon在评论中所说,你应该同步主题,并将其分发给你的制作人线程。例如

var syncSubject = Subject.Synchronize(subject);
// syncSubject.OnNext(value) can be used from multiple threads
subscription = syncSubject.ObserveOn(TaskPoolScheduler.Default).Subscribe(value =>
{
    // use value
});

我想您正在寻找.Synchronize()扩展方法。为了在最近的一个版本(2011年末)中获得性能改进,Rx团队放宽了关于可观察序列生产者的序列性质的假设。然而,你似乎打破了这些假设(这不是一件坏事),但为了让Rx像用户期望的那样重新播放,你应该同步序列,以确保它再次按顺序播放。

这里有更多关于为什么使用Synchronize的解释(第二段)。另一方面,如果您在代码中主动使用锁定,Synchronize可能会参与死锁,至少我亲眼目睹了这种情况。

您可以尝试实现自己的IObserver,它将在其OnNext方法中应用锁。只是一个简单的装饰器:应用一个锁,调用内部OnNext,移除锁。

然后,您可以在IObservable上实现一个扩展方法,类似于.AsThreadSafe().