我有三个可观察到的ob1、ob2、ob2,它们都是API调用,我需要按顺序执行它们,并将值作为数组发出,如果有任何错误,我希望在catchError中捕获错误并停止进行进一步的API调用,例如:
someCombine(ob1, ob2.pipe(tap(val=> doSomething)), ob3.pipe(tap(val=>
doSomething))).pipe(catchError((err)=> of(err))).subscribe(combinedVal=>{
if(err) {
processErr
}
else
doSomething
});
我试过使用zip
zip(ob1, ob2.pipe(tap(val=> doSomething)), ob3.pipe(tap(val=>
doSomething))).pipe(catchError((err)=> of(err))).subscribe(combinedVal=>{
if(err) {
processErr
}
else
doSomething
});
但是zip将在完成第一个之前开始下一个可观察到的,我已经尝试过concat
concat(ob1, ob2.pipe(tap(val=> doSomething)), ob3.pipe(tap(val=>
doSomething))).pipe(catchError((err)=> of(err))).subscribe(combinedVal=>{
if(err) {
processErr
}
else
doSomething
});
但是对于每个单独的可观察到的完成,这些值是单独发出的。
我该如何处理这种情况。
我想是这样的:
import { tap, delay, catchError } from 'rxjs/operators';
import { of, throwError, combineLatest } from 'rxjs';
// this is just to simulate the requests
const ob1$ = of('ob1').pipe(delay(5000)),
ob2$ = of('ob2').pipe(delay(1000)),
ob3$ = of('ob3').pipe(delay(3000));
// abstract to reuse, since we'll catch for each observable
const handleError = (err) => {
// process error
console.log('process error', err);
return throwError(err);
};
const doSomething = (val) => {
// do something
console.log('do something', val);
};
combineLatest(
ob1$.pipe(catchError(handleError), tap(doSomething)),
ob2$.pipe(catchError(handleError), tap(doSomething)),
ob3$.pipe(catchError(handleError), tap(doSomething))
).subscribe(value => console.log(value));
// value = ["ob1", "ob2", "ob3"].
// Only emits when/if all 3 observables emit/complete.
我意识到这并不是你所要求的,因为请求是同时发出的,并以任何可能的顺序发出,但得到的数组将被正确排序,并且只在最后发出。每个可观察到的将触发其自己的tap
或catchError
。除非您的用例非常特殊,否则这应该更可取。
bufferCount
bufferCount
将发出最后x个发射数,因此如果您知道您有3个api请求,则在连接后有3个发射,则可以使用bufferCount(3)
const { of, concat, operators: { delay, bufferCount } } = rxjs;
const ob1$ = of ('ob1').pipe(delay(5000)),
ob2$ = of ('ob2').pipe(delay(1000)),
ob3$ = of ('ob3').pipe(delay(3000));
concat(ob1$, ob2$, ob3$).pipe(bufferCount(3)).subscribe({
next: next => console.dir(next) // ['ob1', 'ob2', 'ob3]
});
<script src="https://unpkg.com/@reactivex/rxjs@6.5.3/dist/global/rxjs.umd.js"></script>
--或--
自定义管道
此drain
管道操作符将在源可观测完成后发射,并同时发射所有发射,因此在联系后,您可以在最后获得所有结果,它将处理任何数量的发射。
const { of, concat, Subscriber, operators: { delay } } = rxjs;
class DrainSubscriber extends Subscriber {
constructor(destination) {
super(destination);
this.result = [];
}
_next(value) {
this.result.push(value);
}
_complete() {
this.destination.next(this.result);
super._complete();
}
}
class DrainOperator {
call(subscriber, source) {
return source.subscribe(new DrainSubscriber(subscriber));
}
}
function drain() {
return (source) => source.lift(new DrainOperator());
}
const ob1$ = of ('ob1').pipe(delay(5000)),
ob2$ = of ('ob2').pipe(delay(1000)),
ob3$ = of ('ob3').pipe(delay(3000));
concat(ob1$, ob2$, ob3$).pipe(drain()).subscribe({
next: next => console.dir(next) // ['ob1', 'ob2', 'ob3]
});
<script src="https://unpkg.com/@reactivex/rxjs@6.5.3/dist/global/rxjs.umd.js"></script>
在TypeScript中https://stackblitz.com/edit/typescript-1yrvm2-drain-operator?file=index.ts
--promise alt--
基于承诺的alt
const { of, concat, operators: { delay } } = rxjs;
const ob1$ = of('ob1').pipe(delay(5000)),
ob2$ = of('ob2').pipe(delay(1000)),
ob3$ = of('ob3').pipe(delay(3000));
async function resolve() {
const res1 = await ob1$.toPromise();
const res2 = await ob2$.toPromise();
const res3 = await ob3$.toPromise();
return [res1, res2, res3];
}
resolve().then(results => {
console.dir(results); // ['ob1', 'ob2', 'ob3]
});
<script src="https://unpkg.com/@reactivex/rxjs@6.5.3/dist/global/rxjs.umd.js"></script>