Reactive Extensions应该评估其各种运算符多少次?
我有以下测试代码:
var seconds = Observable
.Interval(TimeSpan.FromSeconds(5))
.Do(_ => Console.WriteLine("{0} Generated Data", DateTime.Now.ToLongTimeString()));
var split = seconds
.Do(_ => Console.WriteLine("{0} Split/Branch received Data", DateTime.Now.ToLongTimeString()));
var merged = seconds
.Merge(split)
.Do(_ => Console.WriteLine("{0} Received Merged data", DateTime.Now.ToLongTimeString()));
var pipeline = merged.Subscribe();
我希望它每五秒钟写一次"生成的数据"。然后,它将数据交给写入"split/Branch received data"的"split"流和写入"received merged data"的"merged"流。最后,因为"合并"流也从"拆分"流接收,所以它第二次接收数据,并第二次写入"接收的合并数据"。(它写入其中一些内容的顺序并不特别相关)
但我得到的结果是:
8:29:56 AM Generated Data
8:29:56 AM Generated Data
8:29:56 AM Split/Branch received Data
8:29:56 AM Received Merged data
8:29:56 AM Received Merged data
8:30:01 AM Generated Data
8:30:01 AM Generated Data
8:30:01 AM Split/Branch received Data
8:30:01 AM Received Merged data
8:30:01 AM Received Merged data
它正在写两次"生成数据"。据我所知,订阅"秒"IObservable的下游观察者的数量不应影响"生成的数据"写入的次数(应为ONCE),但确实如此。为什么?
注意我在.Net Framework 3.5环境中使用反应式扩展的稳定版本v1.0 SP1。
据推测,他们选择这种方法是为了允许每个订阅者从初始订阅开始以相同的间隔获取其值。考虑一下你的备用间隔是如何工作的:
0s - First subscriber subscribes
5s - Value: 0
8s - Second subscriber subscribes
10s - Value: 1
15s - Value: 2
17s - Unsubscribe both
你最终得到的是这样的东西:
First -----0----1----2-|
Second --1----2-|
在这种情况下,两个观测者的结果明显不同,这取决于是否有其他观测者已经加入。在实现时,Interval
为每个订户提供相同的体验,而与订单或过去的订户无关。
尽管如此,您可以通过在创建seconds
可观察对象时添加.Publish().RefCount()
,将Interval
"转换"为您描述的行为。
虽然有时如果在每一步都多播序列可能会很好,但如果是这样的话,就不允许您拥有Rx允许的丰富组合。
换个角度来看,CCD_ 5是CCD_ 6的基于推送的对偶。IEnumerable
具有延迟求值的特性——在开始遍历Enumerator
之前,不会计算值。Rx序列是惰性地组成的,最后一个Subscribe()(For Each的Observable等价物)实现了该序列。
通过这种方式,只需从最后一个阶段取消订阅,就可以在所有阶段停止管道,这样就可以在不经历管理单个订阅的噩梦的情况下进行解雇和遗忘行为。
在一个相关的注释中,这里有一个脑筋急转弯,展示了Asti与延迟评估的可枚举序列的类比:
private static Random s_rand = new Random();
public static IEnumerable<int> Rand()
{
while (true)
yield return s_rand.Next();
}
public static void Main()
{
var xs = Rand();
var res = xs.Zip(xs, (l, r) => l == r).All(b => b);
Console.WriteLine(res);
}
如果你用自己压缩一个随机序列,你希望所有的元素对都是一样的吗(即导致上面的代码永远运行)?或者,由于某种原因,您是否希望代码终止并打印为false?
(创建类似的可观察代码留给读者练习。)
从面向对象的角度来看,考虑流以定义Observables
/Enumerables
的接口为基础是正常的。如果您可以忽略Enumerator上定义了一个名为Reset的方便方法这一事实,那么Enumerables在功能上就是f -> g -> value?
。枚举器本质上是一个您调用以获取枚举器的函数,它本质上是您一直调用的函数,直到不再返回值为止。
类似地,Observable可以简单地定义为f(g) -> g(h) -> h(value?)
——它是一个函数,当有值时,您需要调用它。
这就是为什么将可枚举或可观测描述为以某种方式定义的函数集之外的任何东西都没有意义——契约是为了确保组成计算的能力。
无论它们是活的、缓存的还是懒惰的,都是可以在其他地方抽象的实现细节——虽然我当然不反对这些细节很重要,但更重要的是关注它的功能性质
作为数据库查询或目录列表的序列具有与预先计算的值集(如数组)相同的IEnumerable
接口。这取决于最终使用序列的代码来进行区分。如果你能习惯这是一种组成高阶函数的方法,你会发现使用Rx或Ix对问题建模更容易。