将生成的事件序列创建为冷序列

本文关键字:创建 事件 | 更新日期: 2023-09-27 18:10:51

FWIW -在询问了meta

的建议后,我将放弃这个问题的先前版本,以相同的方式支持不同的版本。

我有一个包含配置数据的web服务。我希望定期调用它Tok,以便刷新使用它的应用程序中的配置数据。如果服务出错(超时,down等),我想保留以前调用的数据,并在不同的时间间隔Tnotok后再次调用服务。最后,我希望行为是可测试的。

由于管理时间序列和可测试性似乎是响应式扩展的强项,我开始使用一个可观察对象,它将由生成的序列提供。下面是我创建序列的方法:

Observable.Generate<DataProviderResult, DataProviderResult>(
    // we start with some empty data
    new DataProviderResult() { 
            Failures = 0
            , Informations = new List<Information>()},
    // never stop
    (r) => true,
    // there is no iteration
    (r) => r,
    // we get the next value from a call to the webservice
    (r) => FetchNextResults(r),
    // we select time for next msg depending on the current failures
    (r) => r.Failures > 0 ? tnotok : tok,
    // we pass a TestScheduler
    scheduler)
.Suscribe(r => HandleResults(r));

我现在有两个问题:


看起来我正在创建一个热门的可观察对象。即使尝试使用Publish/Connect,我所描述的动作也错过了第一个事件。如何将其创建为冷可观察对象?

myObservable = myObservable.Publish();
myObservable.Suscribe(r => HandleResults(r));
myObservable.Connect() // doesn't call onNext for first element in sequence

当我订阅时,订阅和生成的顺序似乎不一致,因为对于任何帧,订阅方法在FetchNextResults方法之前触发。这正常吗?我希望序列调用frame f的方法,而不是f+1。下面是我用来获取和订阅的代码:

private DataProviderResult FetchNextResults(DataProviderResult previousResult)
{
    Console.WriteLine(string.Format("Fetching at {0:hh:mm:ss:fff}", scheduler.Now));
    try
    {
        return new DataProviderResult() { Informations = dataProvider.GetInformation().ToList(), Failures = 0};
    }
    catch (Exception)
    {}
    previousResult.Failures++;
    return previousResult;
}
private void HandleResults(DataProviderResult result)
{
    Console.WriteLine(string.Format("Managing at {0:hh:mm:ss:fff}", scheduler.Now));
    dataResult = result;
}

这是我所看到的,促使我提出这些问题:

Starting at 12:00:00:000
Fetching at 12:00:00:000 < no managing the result that has been fetched here
Managing at 12:00:01:000 < managing before fetching for frame f
Fetching at 12:00:01:000
Managing at 12:00:02:000
Fetching at 12:00:02:000

编辑:这里有一个简单的复制-粘贴程序来说明这个问题。

/*using System;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using Microsoft.Reactive.Testing;*/
private static int fetchData(int i, IScheduler scheduler)
{
  writeTime("fetching " + (i+1).ToString(), scheduler);
  return i+1;
}
private static void manageData(int i, IScheduler scheduler)
{
  writeTime("managing " + i.ToString(), scheduler);
}
private static void writeTime(string msg, IScheduler scheduler)
{
  Console.WriteLine(string.Format("{0:mm:ss:fff} {1}", scheduler.Now, msg));
}
private static void Main(string[] args)
{
    var scheduler = new TestScheduler();
    writeTime("start", scheduler);
    var datas = Observable.Generate<int, int>(fetchData(0, scheduler),
                                                (d) => true,
                                                (d) => fetchData(d, scheduler),
                                                 (d) => d,
                                                 (d) => TimeSpan.FromMilliseconds(1000),
                                                 scheduler)
                                                 .Subscribe(i => manageData(i, scheduler));
    scheduler.AdvanceBy(TimeSpan.FromMilliseconds(3000).Ticks);
}

输出如下内容:

00:00:000 start
00:00:000 fetching 1
00:01:000 managing 1
00:01:000 fetching 2
00:02:000 managing 2
00:02:000 fetching 3

我不明白为什么第一个元素的管理没有在抓取后立即拾取。在有效提取数据的序列和将数据传递给观察者之间有一秒钟的时间。是我遗漏了什么,还是这是预期行为?如果是这样,是否有办法让观察者立即对新值做出反应?

将生成的事件序列创建为冷序列

您误解了timeSelector参数的用途。每次生成一个值时调用它,它返回一个时间,该时间指示在将该值传递给观察者并生成下一个值之前需要延迟多长时间。

这里有一个非生成的方法来解决你的问题。

private DataProviderResult FetchNextResult()
{
    // let exceptions throw
    return dataProvider.GetInformation().ToList();
}
private IObservable<DataProviderResult> CreateObservable(IScheduler scheduler)
{
    // an observable that produces a single result then completes
    var fetch = Observable.Defer(
        () => Observable.Return(FetchNextResult));
    // concatenate this observable with one that will pause
    // for "tok" time before completing.
    // This observable will send the result
    // then pause before completing.
    var fetchThenPause = fetch.Concat(Observable
        .Empty<DataProviderResult>()
        .Delay(tok, scheduler));
    // Now, if fetchThenPause fails, we want to consume/ignore the exception
    // and then pause for tnotok time before completing with no results
    var fetchPauseOnErrors = fetchThenPause.Catch(Observable
        .Empty<DataProviderResult>()
        .Delay(tnotok, scheduler));
    // Now, whenever our observable completes (after its pause), start it again.
    var fetchLoop = fetchPauseOnErrors.Repeat();
    // Now use Publish(initialValue) so that we remember the most recent value
    var fetchLoopWithMemory = fetchLoop.Publish(null);
    // YMMV from here on.  Lets use RefCount() to start the
    // connection the first time someone subscribes
    var fetchLoopAuto = fetchLoopWithMemory.RefCount();
    // And lets filter out that first null that will arrive before
    // we ever get the first result from the data provider
    return fetchLoopAuto.Where(t => t != null);
}
public MyClass()
{
    Information = CreateObservable();
}
public IObservable<DataProviderResult> Information { get; private set; }

Generate产生冷可观察序列,所以这是我的第一个警钟。

我试着把你的代码拉到linqpad*并运行它,并稍微改变了一下,以专注于问题。在我看来,你有迭代器和ResultSelector函数混淆。这些是背对着的。当您进行迭代时,您应该从上一次迭代中获取值,并使用它来生成下一个值。结果选择器用于从你正在迭代的实例中挑选(选择)值。

所以在你的例子中,你要迭代的类型就是你想要产生值的类型。因此,将ResultSelector函数保留为标识函数x=>x,而IteratorFunction应该是调用WebService的函数。

Observable.Generate<DataProviderResult, DataProviderResult>(
    // we start with some empty data
    new DataProviderResult() { 
            Failures = 0
            , Informations = new List<Information>()},
    // never stop
    (r) => true,
    // we get the next value(iterate) by making a call to the webservice
    (r) => FetchNextResults(r),
    // there is no projection
    (r) => r,
    // we select time for next msg depending on the current failures
    (r) => r.Failures > 0 ? tnotok : tok,
    // we pass a TestScheduler
    scheduler)
.Suscribe(r => HandleResults(r));

作为旁注,尽量选择不可变类型,而不是在迭代时改变值。

*请提供一个自主工作的代码片段,这样人们可以更好地回答你的问题。: -)