我正在学习RX,在RX简介网站上看到了一个我有问题的样本。下面是一个例子,它实现了与具有计数扩展方法的窗口相同的功能:
public static IObservable<IObservable<T>> MyWindow<T>(
this IObservable<T> source,
int count)
{
var shared = source.Publish().RefCount();
var windowEdge = shared
.Select((i, idx) => idx % count)
.Where(mod => mod == 0)
.Publish()
.RefCount();
return shared.Window(windowEdge, _ => windowEdge);
}
我理解var shared = source.Publish().RefCount()
行与窗口边缘查询"共享"源的目的。我不明白的是为什么windowEdge
查询也是用.Publish().RefCount()
定义的?有人能帮我理解为什么这是必要的吗?
好问题!
答案很长
除了性能原因外,windowEdge被引用计数的原因与Select
的使用有关。
在本例中,Select
使用索引参数(idx
),为每个新订阅者唯一确定谁的值。因此,如果我们不引用计数windowEdge
,则每个新订户将在下一个项目产生时接收一个事件,因为mod == 0
将始终为真。
这意味着在没有引用计数的情况下,每个窗口将恰好由两个值组成(假设没有引入其他竞争条件)。示例:
当第一个事件触发时,我们创建一个新窗口并输入事件,在这一点上,我们还使用窗口关闭选择器来获得一个可观察的结果,该结果将在窗口应该关闭时产生。下一个事件将触发,并发送到当前窗口。该事件恰好也是发送给我们的窗口关闭可观察的第一个事件(因为mod == 0
总是真的)。窗口关闭的可观察对象现在已经启动,窗口关闭,留给我们的窗口正好包含两个元素。重复
TLDR
对windowEdge
使用引用计数是必要的,以确保我们在每个"MyWindow"可观察到的情况下只增加idx
一次。