Observable.Delay 或 Observable.Buffer 重用同一线程

本文关键字:Observable 一线 线程 Delay Buffer | 更新日期: 2023-09-27 18:36:32

是否有某个版本的 Observable.Delay 或 Observable.Buffer 不使用新线程作为其计时器?也许精度较低..

我有一个场景,我需要在一个可观察量上调用 Observable.Delay,它每秒产生数千条消息,从而创建大量线程。

谢谢。

Observable.Delay 或 Observable.Buffer 重用同一线程

如果你想限制线程的数量,那么你只需要考虑你使用的调度程序,这与创建的计时器数量无关。基于时间的操作在到期时间调度操作,并且调度程序决定如何在正确的时间调度事件 - Rx 对实际创建的计时器数量很聪明,并且此机制取决于所使用的调度程序。

基于时间的运算符(Scheduler.Default)使用的大多数平台上的默认调度程序将使用任务池来获取线程来调度在将来某个时间调度的事件,这就是为什么您通常会看到不同的线程用于调度事件的原因。

控制线程的一种方法是使用EventLoopScheduler,并确保在使用基于时间的操作时指定此值。这将在同一线程上调度它的所有事件。例如:

var scheduler = new EventLoopScheduler();
Observable.Return(1)
          .Delay(TimeSpan.FromSeconds(4), scheduler)
          .Subscribe(x => Console.WriteLine(x.ToString() + ": "
              + Thread.CurrentThread.ManagedThreadId));
Observable.Return(2).Delay(TimeSpan.FromSeconds(2), scheduler)
          .Subscribe(x => Console.WriteLine(x.ToString() + ": "
              + Thread.CurrentThread.ManagedThreadId));
Console.WriteLine("Done.");

将输出类似的东西(当然,ThreadId 会有所不同):

Done.
2: 3
1: 3

而如果将第一行更改为var scheduler = Scheduler.Default;,您将看到不同的线程 ID。

Rx 中的计时器主题非常复杂,对于这种格式来说可能有点过于宽泛 - 这篇优秀的文章涵盖了许多内部细节。请看"一切都是关于时间"的部分。