Rxjs:带takeUntil(计时器)的Observable在计时器勾选后持续发光



我遇到了takeUntil()的一个非常奇怪的行为。我创建了一个可观察的计时器:

let finish = Observable.timer(3000);

然后我等了一段时间,打电话给

// 2500 ms later
someObservable.takeUntil(finish);

我预计,在计时器"滴答"后,即在其创建后约500ms,所述可观测到的粒子将停止发射。事实上,它在创建后持续发射3000ms,远远超过计时器"滴答"的那一刻。如果我使用包含绝对时间值的Date对象创建计时器,则不会发生这种情况。

这是故意的吗?如果是,解释是什么?

以下是完整的代码,可与node.js一起运行(它需要npm install rx):

let {Observable, Subject} = require("rx")
let start = new Date().getTime();
function timeMs() { return new Date().getTime() - start };
function log(name, value) { 
console.log(timeMs(), name, value);
}
Observable.prototype.log = function(name) {
this.subscribe( v=>log(name,v), 
err=>log(name, "ERROR "+err.message), 
()=>log(name, "DONE"));
return this;
}
let finish = Observable.timer(3000).log("FINISH");
setTimeout( ()=>Observable.timer(0,500).takeUntil(finish).log("seq"), 2500);

这将生成以下输出:

2539 'seq' 0
3001 'FINISH' 0
3005 'FINISH' 'DONE'
3007 'seq' 1
3506 'seq' 2
4006 'seq' 3
4505 'seq' 4
5005 'seq' 5
5506 'seq' 6
5507 'seq' 'DONE'

如果我使用绝对时间创建计时器:

let finish = Observable.timer(new Date(Date.now()+3000)).log("FINISH");

然后它的行为如预期:

2533 'seq' 0
3000 'seq' 'DONE'
3005 'FINISH' 0
3005 'FINISH' 'DONE'

这种行为在各种情况下似乎相当一致。例如,如果您间隔一段时间并使用mergeMap()switchMap()创建子序列,结果将类似:子序列在完成事件之后继续发射。

想法?

您忘记了冷Observables的第一条规则:每个订阅都是一个新流。

您的log操作员有一个错误;它订阅一次Observable(从而创建第一个订阅),然后返回原始Observable,当您将其传递给takeUntil运算符时,它会隐式地再次订阅。因此,实际上您有两个活动的seq流,这两个流的行为都是正确的。

它在绝对情况下有效,因为您基本上是将每个流设置为在特定时间发出,而不是订阅发生时的相对时间。

如果你想看到它的工作,我建议你改变你的实现:

let start = new Date().getTime();
function timeMs() { return new Date().getTime() - start };
function log(name, value) { 
console.log(timeMs(), name, value);
}
Observable.prototype.log = function(name) {
// Use do instead of subscribe since this continues the chain
// without directly subscribing.
return this.do(
v=>log(name,v), 
err=>log(name, "ERROR "+err.message), 
()=>log(name, "DONE")
);
}
let finish = Observable.timer(3000).log("FINISH");
setTimeout(()=> 
Observable.timer(0,500)
.takeUntil(finish)
.log("seq")
.subscribe(), 
2500);

为了完整起见,这里的代码实际上实现了我想要的。通过使用Observable.publish().connect(),它创建了一个"热"计时器,该计时器立即开始计时,并为所有订户保持相同的时间。它还避免了"log"方法中不需要的订阅,正如@paulpdaniels所建议的那样。

警告:注意比赛条件。如果子序列在计时器勾选后开始,它将永远不会停止。为了演示,将最后一行的超时时间从2500更改为3500。

let {Observable, Subject, Scheduler, Observer} = require("rx")
let start = new Date().getTime();
function timeMs() { return new Date().getTime() - start };
function log(name, value) { 
console.log(timeMs(), name, value);
}
var logObserver =  function(name) {
return Observer.create( 
v=>log(name,v), 
err=>log(name, "ERROR "+err.message), 
()=>log(name, "DONE"));
}
Observable.prototype.log = function(name) { return this.do(logObserver(name)); }
Observable.prototype.start = function() { 
var hot = this.publish(); hot.connect(); 
return hot; 
}
let finish = Observable.timer(3000).log("FINISH").start();
setTimeout(()=> 
Observable.timer(0,500)
.takeUntil(finish)
.log("seq")
.subscribe(), 
2500);

输出为

2549 'seq' 0
3002 'FINISH' 0
3006 'seq' 'DONE'
3011 'FINISH' 'DONE'

最新更新