构建RX运算符,将键值对中的一个可观测值拆分为多个可观测量,每个不同的键一个
本文关键字:一个 可观 测量 拆分 键值对 构建 运算符 RX | 更新日期: 2023-09-27 18:29:45
我是一个RX新手,试图构建一些对我来说似乎很复杂的东西。
问题是:我有一个热门的可观察对象,它正在生成键值对,比如<int,foo>。他们来的顺序并不特别。输出应该是源可观察项中每个不同键的可观察项。因此,对于源中的每个值,结果应该是:如果键还没有被看到,则产生一个新的可观测值,新的可观察值立即产生相应的值。如果键以前被看到过,那么相应的值应该由已经存在的相应的可观测值产生。因此:
<1, foo>-<2, foo>-<2, foo>-<1, foo>-<3, foo>-<4, foo>-<1, foo>-<3, foo>x
Output:
<1, foo>-------------------<1, foo>-------------------<1, foo>---------x
---------<2, foo>-<2,foo>----------------------------------------------x
------------------------------------<3, foo>-------------------<3, foo>x
---------------------------------------------<4, foo>------------------x
我正在尝试用window来实现这一点,但我一直纠结于如何"检测"已经看到的键,然后将值"路由"到现有的可观察值。
谢谢!
这正是GroupBy
所做的。例如:
var subject = new Subject<KeyValuePair<int, string>>();
var groups = subject.GroupBy(x => x.Key);
GroupBy
采用keySelector函数,从元素中选择键。输出是流的流,其中发射的每个流具有类型IGroupedObservable<int, KeyValuePair<int, string>>
。这是IObservable的一个子类,它为您提供了Key
属性,因此您也可以识别所选的键。
因此,如果在上面的例子中添加以下代码,我们只需打印出每个新流的密钥:
groups.Subscribe(x => Console.WriteLine(x.Key));
subject.OnNext(new KeyValuePair<int, string>(1, "a"));
subject.OnNext(new KeyValuePair<int, string>(2, "b"));
subject.OnNext(new KeyValuePair<int, string>(1, "c"));
我们得到:
1
2
这是产生的两个流的关键。两个证明我们拥有所有的值,如果我们使用SelectMany
来压平GroupBy
的结果,并只提取值:
groups.SelectMany(x => x).Subscribe(y => Console.WriteLine(y.Value));
subject.OnNext(new KeyValuePair<int, string>(1, "a"));
subject.OnNext(new KeyValuePair<int, string>(2, "b"));
subject.OnNext(new KeyValuePair<int, string>(1, "c"));
我们得到的输出:
a
b
c
正如预期的那样。