构建RX运算符,将键值对中的一个可观测值拆分为多个可观测量,每个不同的键一个



我是一个RX新手,试图构建一些对我来说似乎很复杂的东西。

问题是:我有一个热门的可观察对象,它正在生成键值对,比如<int,foo>。他们来的顺序并不特别。输出应该是源可观察项中每个不同键的可观察项。因此,对于源中的每个值,结果应该是:如果键还没有被看到,则产生一个新的可观测值,新的可观察值立即产生相应的值。如果键以前被看到过,那么相应的值应该由已经存在的相应的可观测值产生。因此:

<1, foo>-<2, foo>-<2, foo>-<1, foo>-<3, foo>-<4, foo>-<1, foo>-<3, foo>x
Output:
<1, foo>-------------------<1, foo>-------------------<1, foo>---------x
---------<2, foo>-<2,foo>----------------------------------------------x
------------------------------------<3, foo>-------------------<3, foo>x
---------------------------------------------<4, foo>------------------x

我正在尝试用window来实现这一点,但我一直纠结于如何"检测"已经看到的键,然后将值"路由"到现有的可观察值。

谢谢!

这正是GroupBy所做的。例如:

var subject = new Subject<KeyValuePair<int, string>>();
var groups = subject.GroupBy(x => x.Key);

GroupBy采用keySelector函数,从元素中选择键。输出是流的流,其中发射的每个流具有类型IGroupedObservable<int, KeyValuePair<int, string>>。这是IObservable的一个子类,它为您提供了Key属性,因此您也可以识别所选的键。

因此,如果在上面的例子中添加以下代码,我们只需打印出每个新流的密钥:

groups.Subscribe(x => Console.WriteLine(x.Key));
subject.OnNext(new KeyValuePair<int, string>(1, "a"));
subject.OnNext(new KeyValuePair<int, string>(2, "b"));
subject.OnNext(new KeyValuePair<int, string>(1, "c"));

我们得到:

1
2

这是产生的两个流的关键。两个证明我们拥有所有的值,如果我们使用SelectMany来压平GroupBy的结果,并只提取值:

groups.SelectMany(x => x).Subscribe(y => Console.WriteLine(y.Value));
subject.OnNext(new KeyValuePair<int, string>(1, "a"));
subject.OnNext(new KeyValuePair<int, string>(2, "b"));
subject.OnNext(new KeyValuePair<int, string>(1, "c"));

我们得到的输出:

a
b
c

正如预期的那样。

最新更新