将嵌套的可观察对象转换为单个可观察对象
本文关键字:对象 观察 单个可 转换 嵌套 | 更新日期: 2023-09-27 18:05:45
我试图写一个可观察的辅助函数,将嵌套序列合并成一个序列。换句话说,签名看起来像这样:
public IObservable<string> CreateNested(
Func<IObservable<string>> createOuter,
Func<string, IObservable<string>> createInner);
一个细节是这些序列封装了服务调用,所以每个序列最多只有一个项。
所以我的第一次尝试成功了。但是对我来说,它看起来不必要的冗长,而且,Wait
的使用打破了可观察模式,因为来自任何序列的错误都会抛出异常,而不是传播回返回的序列:
public IObservable<string> CreateNested(Func<IObservable<string>> createOuter, Func<string, IObservable<string>> createInner)
{
return Observable.StartAsync(() =>
{
return Task.Factory.StartNew(() =>
{
string outerResult = createOuter().Wait();
var inner = createInner(outerResult);
return inner.Wait();
});
});
}
我的第二次尝试稍微好一点,但仍然使用Wait
。
public IObservable<string> CreateNested(Func<IObservable<string>> createOuter, Func<string, IObservable<string>> createInner)
return createOuter().FirstOrDefaultAsync()
.Select(result => createInner(result).Wait());
}
如果我用另一个"FirstOrDefaultAsync()"替换上面的"等待",那么我得到IObservable<IObservable<string>>
。是否有一种正确的方法来"合并"这两个序列?
编辑为了完整起见,我的测试如下(预期输出是"hello world")。
public class Tester
{
public void Test()
{
CreateNested(CreateOuter, CreateInner).Subscribe(Console.WriteLine);
}
private IObservable<string> CreateOuter()
{
return Observable.Create<string>(observer =>
{
Task.Factory.StartNew(() =>
{
Thread.Sleep(1000);
observer.OnNext("hello");
observer.OnCompleted();
});
return new Action(() => { Console.WriteLine("Outer subscriber released"); });
});
}
private IObservable<string> CreateInner(string key)
{
return Observable.Create<string>(observer =>
{
Task.Factory.StartNew(() =>
{
Thread.Sleep(1000);
observer.OnNext(key + " world");
observer.OnCompleted();
});
return new Action(() => { Console.WriteLine("Inner subscriber released"); });
});
}
private IObservable<string> CreateNested(Func<IObservable<string>> createOuter, Func<string, IObservable<string>> createInner)
{
// TODO
}
}
显然你所需要的就是用SelectMany代替Select。
如果您对单子感兴趣,请注意SelectMany是单子的"绑定"函数,它允许函数组合——这正是您在这里试图实现的。