将嵌套的可观察对象转换为单个可观察对象

本文关键字:对象 观察 单个可 转换 嵌套 | 更新日期: 2023-09-27 18:05:45

我试图写一个可观察的辅助函数,将嵌套序列合并成一个序列。换句话说,签名看起来像这样:

public IObservable<string> CreateNested(
    Func<IObservable<string>> createOuter, 
    Func<string, IObservable<string>> createInner);

一个细节是这些序列封装了服务调用,所以每个序列最多只有一个项。

所以我的第一次尝试成功了。但是对我来说,它看起来不必要的冗长,而且,Wait的使用打破了可观察模式,因为来自任何序列的错误都会抛出异常,而不是传播回返回的序列:

public IObservable<string> CreateNested(Func<IObservable<string>> createOuter, Func<string, IObservable<string>> createInner)
{
    return Observable.StartAsync(() =>
    {
        return Task.Factory.StartNew(() =>
        {
            string outerResult = createOuter().Wait();
            var inner = createInner(outerResult);
            return inner.Wait();
        });
    });
}

我的第二次尝试稍微好一点,但仍然使用Wait

public IObservable<string> CreateNested(Func<IObservable<string>> createOuter, Func<string, IObservable<string>> createInner)
    return createOuter().FirstOrDefaultAsync()
        .Select(result => createInner(result).Wait());
}

如果我用另一个"FirstOrDefaultAsync()"替换上面的"等待",那么我得到IObservable<IObservable<string>>。是否有一种正确的方法来"合并"这两个序列?


编辑为了完整起见,我的测试如下(预期输出是"hello world")。

public class Tester
{
    public void Test()
    {
        CreateNested(CreateOuter, CreateInner).Subscribe(Console.WriteLine);
    }
    private IObservable<string> CreateOuter()
    {
        return Observable.Create<string>(observer =>
        {
            Task.Factory.StartNew(() =>
            {
                Thread.Sleep(1000);
                observer.OnNext("hello");
                observer.OnCompleted();
            });
            return new Action(() => { Console.WriteLine("Outer subscriber released"); });
        });
    }
    private IObservable<string> CreateInner(string key)
    {
        return Observable.Create<string>(observer =>
        {
            Task.Factory.StartNew(() =>
            {
                Thread.Sleep(1000);
                observer.OnNext(key + " world");
                observer.OnCompleted();
            });
            return new Action(() => { Console.WriteLine("Inner subscriber released"); });
        });
    }
    private IObservable<string> CreateNested(Func<IObservable<string>> createOuter, Func<string, IObservable<string>> createInner)
    {
        // TODO
    }
}

将嵌套的可观察对象转换为单个可观察对象

显然你所需要的就是用SelectMany代替Select。

如果您对单子感兴趣,请注意SelectMany是单子的"绑定"函数,它允许函数组合——这正是您在这里试图实现的。