RXJS:按顺序组合可观测值以发射



我有三个可观察到的ob1、ob2、ob2,它们都是API调用,我需要按顺序执行它们,并将值作为数组发出,如果有任何错误,我希望在catchError中捕获错误并停止进行进一步的API调用,例如:

someCombine(ob1, ob2.pipe(tap(val=> doSomething)), ob3.pipe(tap(val=> 
doSomething))).pipe(catchError((err)=> of(err))).subscribe(combinedVal=>{
if(err) {
processErr
} 
else 
doSomething
});

我试过使用zip

zip(ob1, ob2.pipe(tap(val=> doSomething)), ob3.pipe(tap(val=> 
doSomething))).pipe(catchError((err)=> of(err))).subscribe(combinedVal=>{
if(err) {
processErr
} 
else 
doSomething
});

但是zip将在完成第一个之前开始下一个可观察到的,我已经尝试过concat

concat(ob1, ob2.pipe(tap(val=> doSomething)), ob3.pipe(tap(val=> 
doSomething))).pipe(catchError((err)=> of(err))).subscribe(combinedVal=>{
if(err) {
processErr
} 
else 
doSomething
});

但是对于每个单独的可观察到的完成,这些值是单独发出的。

我该如何处理这种情况。

我想是这样的:

import { tap, delay, catchError } from 'rxjs/operators';
import { of, throwError, combineLatest } from 'rxjs';
// this is just to simulate the requests
const ob1$ = of('ob1').pipe(delay(5000)),
ob2$ = of('ob2').pipe(delay(1000)),
ob3$ = of('ob3').pipe(delay(3000));
// abstract to reuse, since we'll catch for each observable
const handleError = (err) => {
// process error
console.log('process error', err);
return throwError(err);
};
const doSomething = (val) => {
// do something
console.log('do something', val);
};
combineLatest(
ob1$.pipe(catchError(handleError), tap(doSomething)),
ob2$.pipe(catchError(handleError), tap(doSomething)),
ob3$.pipe(catchError(handleError), tap(doSomething))
).subscribe(value => console.log(value));
// value = ["ob1", "ob2", "ob3"].
// Only emits when/if all 3 observables emit/complete.

我意识到这并不是你所要求的,因为请求是同时发出的,并以任何可能的顺序发出,但得到的数组将被正确排序,并且只在最后发出。每个可观察到的将触发其自己的tapcatchError。除非您的用例非常特殊,否则这应该更可取。

bufferCount
bufferCount将发出最后x个发射数,因此如果您知道您有3个api请求,则在连接后有3个发射,则可以使用bufferCount(3)

const { of, concat, operators: { delay, bufferCount } } = rxjs;
const ob1$ = of ('ob1').pipe(delay(5000)),
ob2$ = of ('ob2').pipe(delay(1000)),
ob3$ = of ('ob3').pipe(delay(3000));
concat(ob1$, ob2$, ob3$).pipe(bufferCount(3)).subscribe({
next: next => console.dir(next) // ['ob1', 'ob2', 'ob3]
});
<script src="https://unpkg.com/@reactivex/rxjs@6.5.3/dist/global/rxjs.umd.js"></script>

--或--

自定义管道
drain管道操作符将在源可观测完成后发射,并同时发射所有发射,因此在联系后,您可以在最后获得所有结果,它将处理任何数量的发射。

const { of, concat, Subscriber, operators: { delay } } = rxjs;
class DrainSubscriber extends Subscriber {
	constructor(destination) {
super(destination);
this.result = [];
}
_next(value) {
this.result.push(value);
}
_complete() {
	this.destination.next(this.result);
super._complete();
}
}
class DrainOperator {
	call(subscriber, source) {
	return source.subscribe(new DrainSubscriber(subscriber));
}
}
function drain() {
	return (source) => source.lift(new DrainOperator());
}
const ob1$ = of ('ob1').pipe(delay(5000)),
ob2$ = of ('ob2').pipe(delay(1000)),
ob3$ = of ('ob3').pipe(delay(3000));
concat(ob1$, ob2$, ob3$).pipe(drain()).subscribe({
next: next => console.dir(next) // ['ob1', 'ob2', 'ob3]
});
<script src="https://unpkg.com/@reactivex/rxjs@6.5.3/dist/global/rxjs.umd.js"></script>

在TypeScript中https://stackblitz.com/edit/typescript-1yrvm2-drain-operator?file=index.ts

--promise alt--

基于承诺的alt

const { of, concat, operators: { delay } } = rxjs;
const ob1$ = of('ob1').pipe(delay(5000)),
ob2$ = of('ob2').pipe(delay(1000)),
ob3$ = of('ob3').pipe(delay(3000));
async function resolve() {
const res1 = await ob1$.toPromise();
const res2 = await ob2$.toPromise();
const res3 = await ob3$.toPromise();
return [res1, res2, res3];
}
resolve().then(results => {
console.dir(results); // ['ob1', 'ob2', 'ob3]
});
<script src="https://unpkg.com/@reactivex/rxjs@6.5.3/dist/global/rxjs.umd.js"></script>

最新更新