RxJS:除了第一次,如何等待两个可观察量发出?



>我有相关数据。以最简单的形式:

export interface A { 
id: number;
name: string;
b_ids: number[];
b_str?: string;
}
export interface B { 
id: number;
name: string;
a_id?: number;
}

以下管道使用.join()将 B 对象的名称映射到A.b_str

withMappedBNames$ = this.bService.allB$.pipe(
combineLatestWith(this.aWithCreate$),
map(([bs, as]: [B[], A[]]) =>
as.map((a: A) => ({
...a,
b_str: bs
.filter((b: B) => a.b_ids?.includes(b.id))
.map((b: B) => b.name)
.join(', '),
} as A)
)
)
);

这样做的问题是,每当使用aWithCreate$在代码中创建 A 的实例时,allB中的数据也会刷新,因此管道在数据从 API 返回之前执行,因为aWithCreate$已经触发了它。

allB$是一个包含所有BB[]流。aWithCreate$是一个包含所有AA[]流,包括新创建的

流所以我的问题是,我怎样才能在第一次只发出一次的情况下完成这项工作,并让他们等待第二次及之后完成?

以下是aServicebService的补充代码。每当应创建新A时,都会将其传递给aCreate.next()。后端更新数据,因此我调用refreshB()来刷新数据

aService.ts

allA$ = this.http
.get<A[]>(this.aUrl)
.pipe(catchError(this.handleError));
private aCreate = new Subject<A>();
aCreate$ = this.aCreate.pipe(
throttleTime(1000),
switchMap((a: A) =>
this.http.post<A>(this.aUrl, a).pipe(
catchError(this.handleError)
)
),
tap(() => this.bService.refreshB()),
shareReplay(1)
);
aWithCreate$ = merge(this.allA$, this.aCreate$)
.pipe(
scan((acc: A[], value: A | A[]) =>
value instanceof Array ? [...value] : [...acc, value],
[] as A[]
)
);
// This function gets called from the component
createA = (a: A) => this.aCreate.next(a);

bService.ts

allB$ = this.http.get<B[]>(this.bUrl).pipe(
catchError(this.handleError),
share()
);
// Action
private bAssign = new Subject<boolean>();
bAssign$ = this.bAssign.asObservable().pipe(
switchMap(() => this.allB$)
);
refreshB = () => this.bAssign.next(true);

感谢帮助!

仅从你的标题来看,也许你可以使用 elementAt() https://rxjs.dev/api/operators/elementAt

最新更新