rxjs激发并行请求,并等待两个请求都完成,并在其中一个请求发出后进行更新



我的要求是我有两个端点,一个是HondaCars的上市,另一个是PeugoetCars的上市。当我使用combineLatest运算符时,它不会等待另一个流完成;

这里我的要求=>第一次等待,直到两个源流都接收到一个值,然后在刷新之后,两个流中的一个都接收到值

编辑

所以经过一段时间的搜索,我只是坚持使用combineLatest,但诀窍就在这里:

this.loading = Observable.combineLatest(
myFirstObservableIndicatingIfFirstHttpReqEnded,
mySecondObservableIndicatingIfSecondHttpReqEnded
).map(d => d.some(t => t === true));

因此,现在,用户只看到微调器,直到所有请求都完成,即使两个可观测值中的一个已经发出了一些值。

为了澄清,我有两个loadingReducer,它们处理每个可观察对象的加载逻辑,所以在我的情况下,我这样做是为了知道两个http调用何时完成

您需要首先使用Observable.forkJoin(),然后使用.switchMap()将可观察到的切换到Observable.combineLatest():

let combined = Observable
.forkJoin([myFirstObservableIndicatingIfFirstHttpReqEnded, mySecondObservableIndicatingIfSecondHttpReqEnded])
.switchMap(joinedResults => {
console.log('forkJoined results!', joinedResults);
this.hasReqEnded = joinedResults.some(t => t === true);
return Observable.combineLatest(joinedResults)
});
combined.subscribe(d => this.hasReqEnded = d.some(t => t === true));

注意this.hasReqEnded只是我创建的一些伪变量。诀窍是,您需要将this.hasReqEnded的值写两次:一次在switchMap(在forkJoin之后(,另一次在subscribe(在Observable.combineLastest()之后(。

或者,您可以有两个Observable:一个是forkJoin,另一个是combineLatest(),您可以订阅这两个Observer并相应地更新您的变量。

这是一个正在工作的JSBin

Http调用是有限流:它发出值并完成。因此,第一部分:"第一次等待,直到两个源流都收到一个值">

const initial$ = Rx.Observable.forkJoin(
this.httpService1.get(),
this.httpService2.get()
);

第二部分:"在两者中的一个接收到值之后进行刷新"。

我们需要将一些无限流(例如,一些用户操作(转换为"本田汽车"或"标致汽车"的流,这可以通过".switchMap":来完成

const stream1$ = someUserAction1$
.switchMap(()=>this.httpService1.get());

所以,如果我们想要一个<[本田汽车,标致汽车]>在一段时间后立即排放:

Rx.Observable.combineLatest(
someUserAction1$
.startWith(null) // we need to make initial call, but we have no user action yet, fix it )
.switchMap(()=>this.httpService2.get())$,
someUserAction2$
.startWith(null)
.switchMap(()=>this.httpService2.get())$
);

最新更新