如何在IObservables的IObservable上等待

本文关键字:IObservable 等待 IObservables | 更新日期: 2024-10-20 14:31:39

我有一个对象的IObservable,我想异步地将其转换为列表的字典。

这是我的代码,GetSource返回一个IObservable:

await GetSource(...)
  .GroupBy(o => o.SiteId)
  .ToDictionary(g => g.Key, g => g.ToList())

当然,这是错误的,因为结果是IDictionary<int, IObservable<IList<T>>>,但我需要一个IDictionary<int, IList<T>>

从本质上讲,我需要await每个IObservable<IList<T>>,但我不知道如何优雅地做到这一点,因为我相信Rx是可能的。NET。

有什么想法吗?

如何在IObservables的IObservable上等待

请注意,无论您做什么,在源代码完成之前,您永远无法完成字典。因此,一个简单的方法是异步获取如下列表:

await GetSource(...).ToList();

然后继续处理生成的IList<T>,就像处理GroupByToDictionary一样——注意,您现在正在针对一个完整的列表使用这些运算符的IEnumerable<T>实现。

该方法假设GetSource()中的事件的数量和时间是可观察的,并且组之间的分布使得在完成的流上计算组和字典的费用不会过高。由于最终返回的是一个完整的字典,无论如何都会保存在内存中,因此我们可能只考虑分组排序和创建字典条目的成本。与长事件流的串行化交付相比,这将在内存中数据上非常快,所以我倾向于认为这种方法在少数情况下不太好。

如果它令人望而却步,那么可能值得同时建立每个组和列表,然后事件到达,在这种情况下,您可以这样做:

await GetSource(...).GroupBy(x => x.SiteId)
                    .SelectMany(x => x.ToList())
                    .ToDictionary(x => x[0].SiteId);

请注意,ToDictionary keySelector中列表的第一个元素将始终存在(否则将不会具体化任何组),因此这是安全的。这看起来确实有点奇怪,但这是我能想到的最简单的拔出钥匙的方法。

或者作为通用函数:

async Task<IDictionary<TKey, IList<T>>> ToDictionaryOfLists<T, TKey>(
    IObservable<T> source,
    Func<T, TKey> keySelector)
{        
    return await source.GroupBy(keySelector)
                       .SelectMany(x => x.ToList())
                       .ToDictionary(x => keySelector(x[0]));           
}

假设一个类别:

public class Site
{
    public int SiteId { get; set; }
}

你可以使用类似:

var result = ToDictionaryOfLists(GetSource(...), x=> x.SiteId);

看起来您实际上并不需要Dictionary,而是需要Lookup<TKey,TElement>

表示键的集合,每个键映射到一个或多个值。

通过使用内置的RX运算符ToLookup,创建Lookup是很简单的。通过使用标准LINQ运算符ToDictionary,将Lookup转换为列表的Dictionary也是微不足道的。以下是这两种运算符组合在一个扩展方法中:

public static async Task<Dictionary<TKey, List<TSource>>> ToDictionaryOfLists<TSource, TKey>(
    IObservable<TSource> source, Func<TSource, TKey> keySelector)
{
    var lookup = await source.ToLookup(keySelector);
    return lookup.ToDictionary(g => g.Key, g => g.ToList());
}

编辑:我刚刚了解到返回Select(…)+Merge()的Task可以用SelectMany(…)代替,所以更好的方法是:

IDictionary<int, IList<Site>> result = await GetSource()
.GroupBy(o => o.SiteId)
.SelectMany(async group => (group.Key, List: await group.ToList()))
.ToDictionary(group => group.Key, group => group.List);

原件:

我刚开始学习RX,但也许这样的东西已经足够好了:

IDictionary<int, IList<Site>> result = await GetSource()
.GroupBy(o => o.SiteId)
.Select(async group => (group.Key, List: await group.ToList()))
.Merge()
.ToDictionary(group => group.Key, group => group.List);

假设站点实现类似:

public class Site
{
   public int SiteId { get; set; }
   //rest of class
}