根据异步处理程序是否仍然繁忙来调节IObservable

本文关键字:调节 IObservable 异步 处理 程序 是否 | 更新日期: 2023-09-27 18:03:17

我有一个IObservable,每秒生成一个值,然后是一个选择,运行代码,可能需要一些时间:

var events = Observable.Interval(TimeSpan.FromSeconds(1));
ssoInfoObservable = events
    .Select(async e =>
    {
        Console.Out.WriteLine("Select   : " + e);
        await Task.Delay(4000);
        return e;
    })
    .SelectMany(t => t.ToObservable())
    .Subscribe(l => Console.WriteLine("Subscribe: " + l));

在我的示例中,长时间运行的操作需要4秒。当Select内的代码正在运行时,我不希望从Interval生成另一个值。我该如何做到这一点?这可能吗?也许可以使用特定的IScheduler实现?

注意,如果没有async代码,一切都像这里描述的那样工作。

这个问题和我之前问的非常相似,除了async/await .

根据异步处理程序是否仍然繁忙来调节IObservable

查看创建异步生成函数的示例。你需要一个时间偏移量,而不需要iterate,所以它看起来更像这样:

    public static IObservable<T> GenerateAsync<T>(TimeSpan span, 
                                                  Func<int, Task<T>> generator, 
                                                  IScheduler scheduler = null)
    {
        scheduler = scheduler ?? Scheduler.Default;
        return Observable.Create<T>(obs =>
        {
            return scheduler.Schedule(0, span, async (idx, recurse) =>
            {
                obs.OnNext(await generator(idx));
                recurse(idx + 1, span);
            });
        });
    }

用法:

  Extensions.GenerateAsync(TimeSpan.FromSeconds(1), idx => /*Async work, return a task*/, scheduler);

作为可能的第二种选择,您可以将switchFirst的实现移植到c#中。SwitchFirst将订阅它接收到的第一个可观察对象,并忽略后续的可观察对象,直到当前订阅完成。

如果你采用这种方法,你可以这样写:

Observable.Interval(TimeSpan.FromSeconds(1))
          .Select(e => Observable.FromAsync(() => /*Do Async stuff*/)
          .SwitchFirst()
          .Subscribe();