假设我们有一个Observable:
var observable = Rx.Observable
.fromEvent(document.getElementById('emitter'), 'click');
如何使其完成(什么将触发所有订阅的观察者的onComplete事件)?
在目前的形式中,您不能。你的可观察是从一个不完整的来源衍生出来的,所以它本身也不完整。你可以做的是用一个完备的条件来扩展这个源。这将类似于:
var end$ = new Rx.Subject();
var observable = Rx.Observable
.fromEvent(document.getElementById('emitter'), 'click')
.takeUntil(end$);
如果要结束observable
,则执行end$.onNext("anything you want here");
。在这种情况下,结束事件是由您生成的。如果这是生成该事件的另一个源(按键等),则可以直接将从该源派生的可观测值作为takeUntil
的参数。
文件:
- http://reactivex.io/documentation/operators/takeuntil.html
- https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/takeuntil.md
对我有用的是使用take()运算符。它将在x个事件之后触发完整的回调。因此,通过1,它将在第一个事件之后完成。
打字:
private preloadImage(url: string): Observable<Event> {
let img = new Image();
let imageSource = Observable.fromEvent(img, "load");
img.src = url;
return imageSource.take(1);
}
我认为您正在寻找的是dispose()
方法。
发件人:https://github.com/Reactive-Extensions/RxJS/blob/master/doc/gettingstarted/creating.md#cold-与热可观测
请注意,subscribe方法返回一个Disposable,这样您就可以取消订阅序列并轻松处理它。当您对可观察序列调用dispose方法时,观察者将停止侦听可观察的数据。通常,您不需要显式调用dispose,除非您需要提前取消订阅,或者当源可观察序列的寿命比观察者更长时。Rx中的订阅是为即发即弃场景设计的,不使用终结器。请注意,Observable运算符的默认行为是尽快处理订阅(即,在发布onCompleted或onError消息时)。例如,代码将为序列a和b订阅x。如果a抛出错误,x将立即从b取消订阅。
我为我的用例找到了一种更简单的方法,如果你想在可观察到的东西完成时做一些事情,那么你可以使用这个:
const subscription$ = interval(1000).pipe(
finalize(() => console.log("Do Something")),
).subscribe();
完成时,当所有订阅都取消订阅时,会触发finalize。