为什么每个观察代理都在一个新线程上运行

本文关键字:新线程 线程 运行 一个 代理 为什么 观察 | 更新日期: 2023-09-27 18:23:53

在Rx中,当使用Scheduler.NewThread进行ObserveOn方法时,当Rx已经保证OnNext永远不会重叠时,让每个观察委托(OnNext)在新线程上运行有什么好处。如果每一个OnNext都将被一个接一个地调用,为什么需要为每一个调用新的线程呢。

我理解为什么要在不同于订阅和应用程序线程的线程上运行观察委派,但在新线程上运行每个观察委派,而它们永远不会并行运行?。。。。对我来说没有意义,还是我错过了什么?

例如

using System;
using System.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Threading;
namespace RxTesting
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("Application Thread : {0}", Thread.CurrentThread.ManagedThreadId);
            var numbers = from number in Enumerable.Range(1,10) select Process(number);
            var observableNumbers = numbers.ToObservable()
                .ObserveOn(Scheduler.NewThread)
                .SubscribeOn(Scheduler.NewThread);
            observableNumbers.Subscribe(
                n => Console.WriteLine("Consuming : {0} 't on Thread : {1}", n, Thread.CurrentThread.ManagedThreadId));
            Console.ReadKey();
        }
        private static int Process(int number)
        {
            Thread.Sleep(500);
            Console.WriteLine("Producing : {0} 't on Thread : {1}", number,
                              Thread.CurrentThread.ManagedThreadId);
            return number;
        }
    }
}

上面的代码产生以下结果。请注意,每次消耗都是在一个新线程上完成的。

Application Thread : 8
Producing : 1    on Thread : 9
Consuming : 1    on Thread : 10
Producing : 2    on Thread : 9
Consuming : 2    on Thread : 11
Producing : 3    on Thread : 9
Consuming : 3    on Thread : 12
Producing : 4    on Thread : 9
Consuming : 4    on Thread : 13
Producing : 5    on Thread : 9
Consuming : 5    on Thread : 14
Producing : 6    on Thread : 9
Consuming : 6    on Thread : 15
Producing : 7    on Thread : 9
Consuming : 7    on Thread : 16
Producing : 8    on Thread : 9
Consuming : 8    on Thread : 17
Producing : 9    on Thread : 9
Consuming : 9    on Thread : 18
Producing : 10   on Thread : 9
Consuming : 10   on Thread : 19

为什么每个观察代理都在一个新线程上运行

NewThread调度程序对于长时间运行的订阅者非常有用。如果您没有指定任何调度程序,则生产者将被阻止等待订阅者完成。通常,您可以使用Scheduler.ThreadPool,但如果您希望有许多长时间运行的任务,您不想用它们阻塞线程池(因为它可能不仅仅由单个可观察对象的订阅者使用)。

例如,考虑对您的示例进行以下修改。我将延迟移动到订阅者,并添加了主线程何时准备好进行键盘输入的指示。请注意,取消对NewThead行的注释时会出现差异。

using System;
using System.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Threading;
namespace RxTesting
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("Application Thread : {0}", Thread.CurrentThread.ManagedThreadId);
            var numbers = from number in Enumerable.Range(1, 10) select Process(number);
            var observableNumbers = numbers.ToObservable()
//              .ObserveOn(Scheduler.NewThread)
//              .SubscribeOn(Scheduler.NewThread)
            ;
            observableNumbers.Subscribe(
                n => {
                    Thread.Sleep(500);
                    Console.WriteLine("Consuming : {0} 't on Thread : {1}", n, Thread.CurrentThread.ManagedThreadId);
                });
            Console.WriteLine("Waiting for keyboard");
            Console.ReadKey();
        }
        private static int Process(int number)
        {
            Console.WriteLine("Producing : {0} 't on Thread : {1}", number,
                              Thread.CurrentThread.ManagedThreadId);
            return number;
        }
    }
}

那么,为什么Rx不进行优化,为每个订阅者使用相同的线程呢?如果订阅服务器运行时间太长,以至于您需要一个新线程,那么线程创建开销无论如何都是微不足道的。一个例外是,如果大多数订阅者都很短,但也有少数订阅者在长时间运行,那么重用同一线程的优化确实会很有用。

我不确定你是否注意到了,但如果消费者比生产者慢(例如,如果你在订阅操作中添加了更长的睡眠),他们将共享同一个线程,因此这可能是一种确保订阅者在内容发布后立即消费内容的机制。

namespace RxTesting
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("Application Thread : {0}", Thread.CurrentThread.ManagedThreadId);
            var numbers = from number in Enumerable.Range(1,10) select Process(number);
            var observableNumbers = numbers.ToObservable()
                .ObserveOn(Scheduler.NewThread)
                .SubscribeOn(Scheduler.NewThread);
            observableNumbers.Subscribe(
                n => 
                    {
                        Console.WriteLine("Consuming : {0} 't on Thread : {1}", n, Thread.CurrentThread.ManagedThreadId);
                        Thread.Sleep(600);
                    }
                        );
            Console.ReadKey();
        }
        private static int Process(int number)
        {
            Thread.Sleep(500);
            Console.WriteLine("Producing : {0} 't on Thread : {1}", number,
                              Thread.CurrentThread.ManagedThreadId);
            return number;
        }
    }
}

输出:

Application Thread : 1
Producing : 1    on Thread : 3
Consuming : 1    on Thread : 4
Producing : 2    on Thread : 3
Consuming : 2    on Thread : 4
Producing : 3    on Thread : 3
Consuming : 3    on Thread : 4
Producing : 4    on Thread : 3
Consuming : 4    on Thread : 4
Producing : 5    on Thread : 3
Consuming : 5    on Thread : 4
Producing : 6    on Thread : 3
Consuming : 6    on Thread : 4
Producing : 7    on Thread : 3
Producing : 8    on Thread : 3
Consuming : 7    on Thread : 4
Producing : 9    on Thread : 3
Consuming : 8    on Thread : 4
Producing : 10   on Thread : 3
Consuming : 9    on Thread : 4
Consuming : 10   on Thread : 4