如何确保在反应式扩展中同步调用选择和订阅方法
本文关键字:选择 调用 同步 方法 扩展 何确保 确保 反应式 | 更新日期: 2023-09-27 18:34:00
我正在测试尝试测试 RX 并创建 Stream(),它提供两个相隔 1 秒的事件。
private IObservable<string> Stream()
{
return Observable.Create<string>
(
(IObserver<string> observer) =>
{
observer.OnNext("a");
observer.OnNext("b");
observer.OnCompleted();
return Disposable.Create(() => Console.WriteLine("Observer has unsubscribed"));
}
);
}
_refreshFiberStream =
Stream()
.SubscribeOn(schedulerProvider.EventLoop)
.Select(DoCalc)
.ObserveOn(schedulerProvider.Dispatcher)
.Subscribe(Update);
和调度提供程序
public sealed class SchedulerProvider : ISchedulerProvider
{
public IScheduler Dispatcher
{
get { return DispatcherScheduler.Current; }
}
public IScheduler EventLoop
{
get { return new EventLoopScheduler(); }
}
// ...
}
我看到 DoCalc 方法为每个输入调用两次,然后是调用两次的更新方法,DoCalc,DoCalc,更新,更新。相反,我试图先对DoCalc方法进行排序,然后是更新方法。重复第二个输入的序列,因此第二个输入可以建立在第一个输入的结果之上
,DoCalc,更新,DoCalc,更新任何想法
首先,我会遵循将 SubscribeOn/ObserveOn 对放在最终订阅方法之前的模式。这只会为您节省很多头发。
_refreshFiberStream = Stream()
.Select(DoCalc)
//Always in this pattern
.SubscribeOn(schedulerProvider.EventLoop) //SubscribeOn right before ObserveOn (or before Subscribe if no ObserveOn is used)
.ObserveOn(schedulerProvider.Dispatcher) //ObserveOn right before Subscribe
.Subscribe(Update); //Subscribe Last
接下来,如果我重新创建您的示例并添加缺少的代码,我会得到正确/预期的输出。
void Main()
{
var els = new EventLoopScheduler();
var dispatcher = new EventLoopScheduler();
Stream()
.SubscribeOn(els) //TODO: Move to line just before ObserveOn
.Select(DoCalc)
.ObserveOn(dispatcher)
.Subscribe(Update);
}
// Define other methods and classes here
private IObservable<string> Stream()
{
return Observable.Create<string>
(
(IObserver<string> observer) =>
{
observer.OnNext("a");
Thread.Sleep(1000);
observer.OnNext("b");
observer.OnCompleted();
return Disposable.Create(() => Console.WriteLine("Observer has unsubscribed"));
}
);
}
private string DoCalc(string val)
{
val.Dump("DoCalc()");
Thread.Sleep(1000);
return val;
}
private void Update(string val)
{
val.Dump("Update()");
}
因此,您可以重新发布带有输出的完整示例代码。将此作为单元测试、控制台应用或 LinqPad 示例执行。
猜测您的下一个问题,可能是:如何使用来自前一个值的投影(Select
)的值来帮助计算下一个值?如果需要一组正在运行的计算,请使用 Scan
方法,或者,如果只需要最终结果而不是计算的每个值,请使用 Aggregate
方法。