用程序方式将rxjs留给等待循环



我有一个应用程序,可以打开服务器抽取SSE的流。我希望循环一直运行到所需的值到达,然后离开循环继续。我看过takeWhile运算符,但找不到实现它的方法。我也不知道如何才能unsubscribe,因为流永远不会完成。。。

const stream = this.sseService.returnAsObservable();
for await (const data of eachValueFrom(stream)) {
console.log(data);
if (data.jobid === "JOB05879") {
this.sseService.stopConnection();
// how to get out now?
}
}
console.log('we are out');

通常,混合promise和observable是一种反模式。如果你不介意的话,那么这里有一种方法可以只获取第一个数据,其中data.jobid==="JOB05879";。您不需要等待,因为您只需要从该流中获得1个值。

const stream = this.sseService.returnAsObservable().pipe(
filter(data => data.jobid === "JOB05879"),
first()
);
data = await stream.toPromise();
console.log(data);
this.sseService.stopConnection();
console.log('we are done');

没有承诺:

const stream = this.sseService.returnAsObservable().pipe(
filter(data => data.jobid === "JOB05879"),
first()
).subscribe({
next: data => {
console.log(data);
this.sseService.stopConnection();
},
error: _ console.log("Data with jobid JOB05879 not found"),
complete: () => console.log("We are done");
});

更新#1:使用forkJoin

您编写了以下内容,首先获取finito的数组,然后使用该数组作为notes的输入。

const finito = await forkJoin([     
this.jobsService.submitTestJob('blabla1').pipe(first()),     
this.jobsService.submitTestJob('blabla2').pipe(first())
]).toPromise(); 
const notes = await forkJoin([     
this.sseService.returnAsObservable().pipe(
filter((d: any) => d.jobid === finito[0].jobid), 
first()
),     
this.sseService.returnAsObservable().pipe(
filter((d: any) => d.jobid === finito[1].jobid), 
first()
) 
]).toPromise();

这应该有效,尽管你应该做一些错误检查

这实际上可以使用RxJS运算符来简化,这样您就不必等待每个finito

const notes = await merge(
this.jobsService.submitTestJob('blabla1').pipe(first()),
this.jobsService.submitTestJob('blabla2').pipe(first()),
this.jobsService.submitTestJob('blabla3').pipe(first()),
this.jobsService.submitTestJob('blabla4').pipe(first()),
this.jobsService.submitTestJob('blabla5').pipe(first()),
).pipe(
mergeMap(finito => this.sseService.returnAsObservable().pipe(
filter((d: any) => d.jobid === finito.jobid), 
first()
)),
toArray()
).toPromise();

如果你愿意的话,还有一个让它更加简洁的步骤:

const notes = await from([
"blabla1", 
"blabla2", 
"blabla3", 
"blabla4", 
"blabla5"
]).pipe(
mergeMap(blabla => 
this.jobsService.submitTestJob(blabla).pipe(first())
),
mergeMap(finito => this.sseService.returnAsObservable().pipe(
filter((d: any) => d.jobid === finito.jobid), 
first()
)),
toArray()
).toPromise();

更简洁:

const notes = await from([1,2,3,4,5]).pipe(
map(num => `blabla${num}`),
mergeMap(blabla => 
this.jobsService.submitTestJob(blabla).pipe(first())
),
mergeMap(finito => this.sseService.returnAsObservable().pipe(
filter((d: any) => d.jobid === finito.jobid), 
first()
)),
toArray()
).toPromise();

只拨打1个sseService电话

从您的示例代码来看,您似乎可以只对这个.seService.returnAsObservable((进行一次调用。它可以过滤任何允许的作业。

看起来可能是这样的:

const params = [1,2,3,4,5];
const notes = await from(params).pipe(
map(num => `blabla${num}`),
mergeMap(blabla => 
this.jobsService.submitTestJob(blabla).pipe(first())
),
toArray(),
mergeMap(finitoArray => this.sseService.returnAsObservable().pipe(
filter(d => finitoArray.map(f => f.jobid).includes(d.jobid))
)),
take(params.length),
toArray()
).toPromise();

把这个例子带回你写的两阶段代码,它看起来像这样:

const params = [1,2,3,4,5];
const streams = params
.map(num => `blabla${num}`)
.map(blabla => this.jobsService.submitTestJob(blabla).pipe(first())
const finito = await forkJoin(streams).toPromise(); 
const notes = await this.sseService.returnAsObservable().pipe(
filter(d => finitoArray.map(f => f.jobid).includes(d.jobid)),
take(params.length),
toArray()
).toPromise();

最新更新