如何在Rx中实现DistinctLatest(和缓存)操作符?
本文关键字:缓存 操作符 DistinctLatest Rx 实现 | 更新日期: 2023-09-27 18:18:21
我有一个问题a缓存服务更新和新值为" DistinctLatest ",订阅时缓存内容完整,社区处理得很好。有人提出了一个问题,即缓存和替换上述问题中定义的值的实际目标可以用.DistinctLatest
操作符定义。
OK !似乎很少有人谈论这样一个运营商。在搜索和思考时,我发现ReactiveX: Group和Buffer在每个组中只有最后一个项目,这有点接近。为了模拟最初的问题,我尝试将缓存操作符写成
/// <summary>
/// A cache that keeps distinct elements where the elements are replaced by the latest.
/// </summary>
/// <typeparam name="T">The type of the result</typeparam>
/// <typeparam name="TKey">The type of the selector key for distinct results.</typeparam>
/// <param name="newElements">The sequence of new elements.</param>
/// <param name="seedElements">The seed elements when the cache is started.</param>
/// <param name="replacementSelector">The replacement selector to choose distinct elements in the cache.</param>
/// <returns>The cache contents upon first call and changes thereafter.</returns>
public static IObservable<T> Cache<T, TKey>(this IObservable<T> newElements, IEnumerable<T> seedElements, Func<T, TKey> replacementSelector)
{
var s = newElements.StartWith(seedElements).GroupBy(replacementSelector).Select(groupObservable =>
{
var replaySubject = new ReplaySubject<T>(1);
groupObservable.Subscribe(value => replaySubject.OnNext(value));
return replaySubject;
});
return s.SelectMany(i => i);
}
但是做测试似乎也不起作用。看起来,如果在开始时订阅,则会观察到初始值和更新(以及新值)。如果最后订阅了一个,则只记录替换的种子值。
现在,我想知道一个通用的DistinctLast
算子,我认为这一点,但它不起作用,然后这个"缓存"添加的是种子值和组的平坦化,但这不是测试告诉的。我也尝试了一些分组和.TakeLast()
的事情,但没有骰子。
如果有人对此有指点或思考,我将很高兴,希望这是一些普遍有益的东西。
@LeeCampbell做了大部分的工作。参见其他参考问题。总之,下面是代码:
public static class RxExtensions
{
public static IObservable<T> DistinctLatest<T, TKey>(this IObservable<T> newElements, IEnumerable<T> seedElements, Func<T, TKey> replacementSelector)
{
return seedElements.ToObservable()
.Concat(newElements)
.GroupBy(i => replacementSelector)
.SelectMany(grp => grp.Replay(1).Publish().RefCount());
}
}