如何在RxJS中完成Observable



假设我们有一个Observable:

var observable = Rx.Observable
    .fromEvent(document.getElementById('emitter'), 'click');

如何使其完成(什么将触发所有订阅的观察者的onComplete事件)?

在目前的形式中,您不能。你的可观察是从一个不完整的来源衍生出来的,所以它本身也不完整。你可以做的是用一个完备的条件来扩展这个源。这将类似于:

var end$ = new Rx.Subject();
var observable = Rx.Observable
    .fromEvent(document.getElementById('emitter'), 'click')
    .takeUntil(end$);

如果要结束observable,则执行end$.onNext("anything you want here");。在这种情况下,结束事件是由您生成的。如果这是生成该事件的另一个源(按键等),则可以直接将从该源派生的可观测值作为takeUntil的参数。

文件:

  • http://reactivex.io/documentation/operators/takeuntil.html
  • https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/takeuntil.md

对我有用的是使用take()运算符。它将在x个事件之后触发完整的回调。因此,通过1,它将在第一个事件之后完成。

打字:

private preloadImage(url: string): Observable<Event> {
    let img = new Image();
    let imageSource = Observable.fromEvent(img, "load");
    img.src = url;
    return imageSource.take(1);
}

我认为您正在寻找的是dispose()方法。

发件人:https://github.com/Reactive-Extensions/RxJS/blob/master/doc/gettingstarted/creating.md#cold-与热可观测

请注意,subscribe方法返回一个Disposable,这样您就可以取消订阅序列并轻松处理它。当您对可观察序列调用dispose方法时,观察者将停止侦听可观察的数据。通常,您不需要显式调用dispose,除非您需要提前取消订阅,或者当源可观察序列的寿命比观察者更长时。Rx中的订阅是为即发即弃场景设计的,不使用终结器。请注意,Observable运算符的默认行为是尽快处理订阅(即,在发布onCompleted或onError消息时)。例如,代码将为序列a和b订阅x。如果a抛出错误,x将立即从b取消订阅。

我为我的用例找到了一种更简单的方法,如果你想在可观察到的东西完成时做一些事情,那么你可以使用这个:

const subscription$ = interval(1000).pipe(
  finalize(() => console.log("Do Something")),
).subscribe();

完成时,当所有订阅都取消订阅时,会触发finalize。

最新更新