Rx 如何并行化长时间运行的任务
本文关键字:运行 任务 长时间 并行化 Rx | 更新日期: 2023-09-27 18:32:50
我有以下代码片段,它枚举某些 xml 的元素(从svn log --xml ...
进程的输出中读取(,然后为每个 xml 元素运行长时间运行的方法。
var proc = Process.Start(svnProcInfo);
var xml = XDocument.Load(proc.StandardOutput);
var xElements = xml.Descendants("path")
.ToObservable()
//.SubscribeOn(ThreadPoolScheduler.Instance)
.Select(descendant => return LongRunning(descendant));
xElements
//.SubscribeOn(NewThreadScheduler.Default)
.Subscribe(result => Console.WriteLine(result);
Console.ReadKey();
LongRunning
方法并不重要,但在其中我记录了它运行的线程。假设它运行了整整一秒钟。
我的问题是,取消评论SubscribeOn()
行没有任何效果。对LongRunning
的调用是连续的,并且每秒发生一次,在同一线程上(尽管与主(初始(线程不同(。
这是一个控制台应用程序。
我是Rx的新人。我错过了什么?
编辑:
在尝试了李·坎贝尔的答案后,我注意到了另一个问题。
Console.Error.WriteLine("Main thread " + Thread.CurrentThread.ManagedThreadId);
var xElements = xml.Descendants("path").ToObservable()
//.ObserveOn(Scheduler.CurrentThread)
.SelectMany(descendant =>
Observable.Start(()=>LongRunning(descendant),NewThreadScheduler.Default))
.Subscribe(result => Console.WriteLine(
"Result on: " + Thread.CurrentThread.ManagedThreadId));
[...]
string LongRunning(XElement el)
{
Console.WriteLine("Execute on: Thread " + Thread.CurrentThread.ManagedThreadId);
DoWork();
Console.WriteLine("Finished on Thread " + Thread.CurrentThread.ManagedThreadId);
return "something";
}
这将给出以下输出:
Main thread 1
Execute on: Thread 3
Execute on: Thread 4
Execute on: Thread 5
Execute on: Thread 6
Execute on: Thread 7
Finished on Thread 5
Finished on Thread 6
Result on: 5
Result on: 6
Finished on Thread 7
Result on: 7
Finished on Thread 3
Result on: 3
Finished on Thread 4
Result on: 4
Done! Press any key...
我需要的是一种将结果"排队"到同一线程的方法。我认为这就是ObserveOn()
的目的,但是取消注释上面的ObserveOn()
行不会改变结果。
首先,Rx是一个用于控制异步的库(或范式(,特别是可观察的序列。这里有一个可枚举的序列(Xml 后代(和一个阻塞/同步LongRunning
方法调用。
通过在可枚举序列上调用ToObservable()
,您实际上只是在遵守接口,但是随着您的序列实现(渴望而不是懒惰(,它没有什么真正的可观察/异步。
通过调用 SubscribeOn
,您的想法是正确的,但转换已经在ToObservable()
运算符中完成。您可能打算调用ToObservable(ThreadPoolScheduler.Instance)
以便可以在另一个线程上完成IEnumerable
的任何慢速迭代。然而。。。我认为这不会是一个缓慢的迭代器,所以这可能不会解决任何问题。
您最可能想要做的(如果 Rx 是此类问题的最佳工具,则值得怀疑(是安排对LongRunning
方法的调用。但是,这意味着您需要将异步添加到您的选择中。执行此操作的一个好方法是 Rx 工厂方法之一,例如 Observable.FromAsync
或 Observable.Start
.但是,这将使您的序列成为IObservable<IObservable<T>>
。您可以使用 SelectMany
或 Merge
将其展平。
说了这么多,我认为你想做的是:
var proc = Process.Start(avnProcInfo);
var xml = XDocument.Load(proc.StandardOutput);
//EDIT: Added ELS to serialise results onto a single thread.
var els = new EventLoopScheduler(threadStart=>new Thread(threadStart)
{
IsBackground=true,
Name="MyEventLoopSchedulerThread"
});
var xElements = xml.Descendants("path").ToObservable()
.SelectMany(descendant => Observable.Start(()=>LongRunning(descendant),ThreadPoolScheduler.Instance))
.ObserveOn(els)
.Subscribe(result => Console.WriteLine(result));
Console.ReadKey();