Rx 如何按键对复杂对象进行分组,然后执行 SelectMany 而不“停止”流
本文关键字:SelectMany 执行 然后 而不 停止 何按键 复杂 对象 Rx | 更新日期: 2023-09-27 18:35:56
这与我在这里的另一个问题有关。James World提出了如下解决方案:
// idStream is an IObservable<int> of the input stream of IDs
// alarmInterval is a Func<int, TimeSpan> that gets the interval given the ID
var idAlarmStream = idStream
.GroupByUntil(key => key, grp => grp.Throttle(alarmInterval(grp.Key)))
.SelectMany(grp => grp.IgnoreElements().Concat(Observable.Return(grp.Key)));
<编辑2:>
问:如何在不等待第一个事件到达的情况下立即启动计时器?我想,这就是我问题的根本问题。为此,我计划发送带有我知道应该存在的ID的虚拟对象。但正如我下面所写的,我最终遇到了一些其他问题。尽管如此,我认为解决这个问题也会很有趣。
然后与其他有趣的部分一起前进!现在,如果我想像下面这样对一个复杂对象进行分组并按键分组(不会编译)
var idAlarmStream = idStream
.Select(i => new { Id = i, IsTest = true })
.GroupByUntil(key => key.Id, grp => grp.Throttle(alarmInterval(grp.Key)))
.SelectMany(grp => grp.IgnoreElements().Concat(Observable.Return(grp.Key)));
然后我就惹上麻烦了。我无法修改有关SelectMany
、Concat
和Observable.Return
的部分,以便查询像以前一样工作。例如,如果我将查询为
var idAlarmStream = idStream
.Select(i => new { Id = i, IsTest = true })
.GroupByUntil(key => key.Id, grp => grp.Throttle(alarmInterval(grp.Key)))
.SelectMany(grp => grp.IgnoreElements().Concat(Observable.Return(grp.Key.First())))
.Subscribe(i => Console.WriteLine(i.Id + "-" + i.IsTest);
然后需要两个事件才能在Subscribe
中观察到输出。这是呼唤First
的效果,我收集。此外,我也喜欢在调用alarmInterval
时使用复杂的对象属性。
有人可以提供正在发生的事情,甚至可能是解决方案吗?使用未修改的解决方案的问题在于,分组不仅会查找键值的 Id,还会查找 IsTest 字段。
<编辑:>请注意,这个问题可能首先可以通过创建一个显式类或结构来解决,然后实现自定义IEquatable
然后按原样使用 James 的代码,以便仅通过 ID 进行分组。不过感觉就像黑客。
此外,如果您想计算在闹钟响起之前看到某个项目的次数,您可以这样做,利用 Select
中的计数器过载。
var idAlarmStream = idStream
.Select(i => new { Id = i, IsTest = true })
.GroupByUntil(key => key.Id, grp => grp.Throttle(alarmInterval(grp.Key))
.SelectMany(grp => grp.Select((count, alarm) => new { count, alarm }).TakeLast(1));
请注意,对于第一个(种子)项目,这将是 0 - 这可能是您想要的。
您正在 Select 中创建匿名类型。我们称之为A1。我假设你的idStream是一个IObservable。由于这是GroupByUntil
中的键,因此您无需担心键比较 - int 相等很好。
GroupByUntil
是IObservable<IGroupedObservable<int, A1>>
.
所写的SelectMany试图成为一个IObservable<A1>
。您只需在此处Concat(Observable.Return(grp.Key))
- 但键的类型与组元素的类型必须匹配,否则 SelectMany 将不起作用。所以钥匙也必须是A1。匿名类型使用结构相等,返回类型将是 A1 流 - 但不能将其声明为公共返回类型。
如果你只想要 Id,你应该在Throttle
后添加一个.Select(x => x.Id)
:
var idAlarmStream = idStream
.Select(i => new { Id = i, IsTest = true })
.GroupByUntil(key => key.Id, grp => grp.Throttle(alarmInterval(grp.Key)
.Select(x => x.Id))
.SelectMany(grp => grp.IgnoreElements().Concat(Observable.Return(grp.Key)));
如果你想要 A1 代替 - 你需要创建一个实现相等的具体类型。
编辑
我没有测试过它,但你也可以像这样更简单地展平它,我认为这更容易!不过,它正在输出 A1,因此如果您需要在某处返回流,则必须处理这个问题。
var idAlarmStream = idStream
.Select(i => new { Id = i, IsTest = true })
.GroupByUntil(key => key.Id, grp => grp.Throttle(alarmInterval(grp.Key))
.SelectMany(grp => grp.TakeLast(1));