RxJs:forkJoin() 没有运行,因为我的可观察量列表不完整



好吧,我有一个无法解决的问题。

我正在浏览XLSX文件中的数据列表。 对于文件的每一行,将向服务器发送两个请求:

  • 一个恢复时期
  • 另一个用于恢复用户

创建一个可观察的钱包以将其余数据存储在该行中。之后立即完成。 否则,循环将继续执行,并且列表中的最后一个数据将在forkjoin()中考虑。

所以我使用forkJoin()来等待仍在循环中的三个可观察量的结果。 完成三个可观察量后,将发送一个新请求。正是在这里阻止了他。新查询将添加到可观察量列表中。

并且我想仅在我的可观察量列表完成后再次使用forkJoin()运行其余代码, 浏览XLSX文件中的数据后。 问题是我的循环在我的可观察量列表包含任何内容之前结束, 第二个forkJoin()永远不会执行。

法典:

for(var objectives of this.XLSXObjectives) {
if(objectives.values != 0) {
// 2 requests (period and user)
var period$ = this.storePeriods.getPeriod(new Period({month: objectives.values[0].month, year: objectives.values[0].year}));
var user$ = this.storeUsers.getUser(new User({num_seller: objectives.values[0].userCode}));
// Observable (wallet)
var XLSXWalletSubject: BehaviorSubject<Wallet> = new BehaviorSubject<Wallet>(null);
var XLSXWallet$: Observable<any> = XLSXWalletSubject.asObservable();
XLSXWalletSubject.next(new Wallet({
wallet_name: objectives.values[0].walletName,
user: null,
period: null,
margin_m: 0,
value_100: objectives.values[0].ValueAt100Percent,
percentage_100_m: 0
}));
// Wait for the result of the three observables
forkJoin(period$, user$, XLSXWallet$).subscribe(
([period, user, wallet]) => {
console.warn("OK!");
wallet.period = period;
wallet.user = user;
// New request
var wallet$ = this.storeObjectives.addXLSXWallet(wallet);
// Request is added to a list of observables
this.observables$.push(wallet$);
wallet$.subscribe(w => { ... });
}
);
// Observable (wallet) complete
XLSXWalletSubject.complete();
}
}
console.warn(this.observables$); // this.observables$ = []
forkJoin(this.observables$).subscribe(results => {
console.log("It doesn't work!!!")
});

如果有人有解决方案来帮助我,谢谢。

我在这里不是 100% 确定,但我认为您需要的是以下内容:

from(this.XLSXObjectives).pipe(
filter(objectives => objectives.values != 0),
mergeMap(objectives => forkJoin(
this.storePeriods.getPeriod(new Period({month: objectives.values[0].month, year: objectives.values[0].year}),
this.storeUsers.getUser(new User({num_seller: objectives.values[0].userCode}),
of(new Wallet({
wallet_name: objectives.values[0].walletName,
user: null,
period: null,
margin_m: 0,
value_100: objectives.values[0].ValueAt100Percent,
percentage_100_m: 0
}))
)),
map(([period, user, wallet]) => ({ ...wallet, period, user})),
mergeMap(wallet => this.storeObjectives.addXLSXWallet(wallet)),
toArray()
).subscribe(results => { // handle array of results here });
  • 目标逐一发射
  • mergeMap订阅三个内部可观察量的forkJoin(如果您需要一次处理一个请求,请使用concatMap,如果您只需要一定数量的并发请求,请向mergeMap添加一个并发参数(
  • map调用使用period: perioduser: user填充wallet对象
  • 最终mergeMap使用wallet对象发出下一个请求
  • toArray调用等待整个可观察量完成,并以数组的形式发出所有接收的值

最新更新