我需要一些类似combineLatest的东西,但当任何可观察到的东西都会发射



所以我的代码是:

combineLatest([obs1, obs2]).subscribe((x => {
console.log(x)
})

当任一可观察到的发射时,应触发log语句。然而,正如文档中所描述的,有一个棘手的问题:

combineLatest不会发出初始值,直到每个可观察到的对象发出至少一个值

我需要它发射,即使其中一个原始可观察器从未发射过。我该怎么做?

每个可观察的典型使用startWith

combineLatest([
obs1.pipe(startWith(null)),
obs2.pipe(startWith(null)]
).subscribe((x => {
console.log(x)
})

您可以使用Merge方法。

merge(obs1, obs2).subscribe(x => {
console.log(x);
});

下面是一个如何实现它的简单示例(而不是向列表中的每个Observable添加pipe(startWith(null))(。

// overloading:
function combineLatestAny<O1 extends ObservableInput<any>>(sources: [O1]): Observable<[ObservedValueOf<O1>]>;
function combineLatestAny<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>>(sources: [O1, O2]): Observable<[ObservedValueOf<O1>, ObservedValueOf<O2>]>;
function combineLatestAny<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>>(sources: [O1, O2, O3]): Observable<[ObservedValueOf<O1>, ObservedValueOf<O2>, ObservedValueOf<O3>]>;
function combineLatestAny<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>, O4 extends ObservableInput<any>>(sources: [O1, O2, O3, O4]): Observable<[ObservedValueOf<O1>, ObservedValueOf<O2>, ObservedValueOf<O3>, ObservedValueOf<O4>]>;
function combineLatestAny<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>, O4 extends ObservableInput<any>, O5 extends ObservableInput<any>>(sources: [O1, O2, O3, O4, O5]): Observable<[ObservedValueOf<O1>, ObservedValueOf<O2>, ObservedValueOf<O3>, ObservedValueOf<O4>, ObservedValueOf<O5>]>;
function combineLatestAny<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>, O4 extends ObservableInput<any>, O5 extends ObservableInput<any>, O6 extends ObservableInput<any>>(sources: [O1, O2, O3, O4, O5, O6]): Observable<[ObservedValueOf<O1>, ObservedValueOf<O2>, ObservedValueOf<O3>, ObservedValueOf<O4>, ObservedValueOf<O5>, ObservedValueOf<O6>]>;
function combineLatestAny<O extends ObservableInput<any>>(sources: O[]): Observable<ObservedValueOf<O>[]>;
// implementation:
function combineLatestAny(sources) {
return new Observable<any>(observer => {
const subscribtion = new Subscription();
const values = new Array(sources.length);
let active: number = sources.length;
sources.forEach((source, index) => subscribtion.add(
source.subscribe({
next: value => { values[index] = value; observer.next(values) },
error: error => { observer.error(error); subscribtion.unsubscribe() },
// only complete when all input observables have completed
complete: () => (--active === 0) && observer.complete() 
})
))
return {
unsubscribe: () => subscribtion.unsubscribe()
}
})
}

用法:

combineLatestAny([timer(1000), timer(3000).pipe(mapTo("string")), timer(2000).pipe(mapTo(true))])
.subscribe(
{
next: console.log,
error: console.error,
complete: () => console.log("completed")
}
)

相关内容

最新更新