我在可观察量(courses$
(上使用RxJSshareReplay()
运算符在其他两个可观察量(beginnerCourses$
和advancedCourses$
(之间共享可观察流。它工作正常,成功后在两个可观察量之间共享单个 API 调用响应。
但是,当涉及到错误时,这些可观察到的不共享错误,并且在浏览器控制台中可以看到错误被抛出两次。shareReplay()
运算符不是也共享错误吗?这是预期的行为吗?
const http$ = createHttpObservable('/api/courses');
const courses$ = http$
.pipe(
map(res => res['payload'] ),
shareReplay(),
catchError(err => {
return throwError(err);
})
);
this.beginnerCourses$ = courses$
.pipe(
map(courses => courses
.filter(course => course.category === 'BEGINNER')));
this.advancedCourses$ = courses$
.pipe(
map(courses => courses
.filter(course => course.category === 'ADVANCED')));
}
我认为这是预期的行为,有点出乎意料的是你得到了 2 个不同的错误。
shareReplay
在数据使用者和数据生成者之间放置ReplaySubject
。当错误通知到达时,正在使用ReplaySubject
将向所有注册的订阅者发送相同的错误通知:
error(err: any) {
if (this.closed) {
throw new ObjectUnsubscribedError();
}
this.hasError = true;
this.thrownError = err;
this.isStopped = true;
const { observers } = this;
const len = observers.length;
const copy = observers.slice();
for (let i = 0; i < len; i++) {
copy[i].error(err);
}
this.observers.length = 0;
}
源
但是当使用shareReplay
时,当发生错误时,当新订户即将订阅时,正在使用的ReplaySubject
将被另一个替换。说它正在被替换,这也需要重新订阅源。
所以我认为所有订阅者都应该收到相同的错误通知,只要他们已经是ReplaySubject
订阅者列表的一部分。否则,当新订阅者进来时,源将被重新订阅。
您可以做的是防止ReplaySubject
接收错误通知并允许其订阅者按原样接收它,方法是使用materialize
和dematerialize
运算符:
const courses$ = http$
.pipe(
materialize(), // Everything as a `next` notification
map(res => res['payload'] ),
shareReplay(),
dematerialize() // Back to the original event
);
使用这种方法,如果注册订阅者收到错误通知,它将被取消订阅,这意味着它也将从ReplaySubject
的订阅者列表中删除。但是ReplaySubject
仍然存在,并且不会在后续订阅者中被替换。
另外,我认为这是多余的:
catchError(err => throwError(err));
当可观察量抛出错误时,这是预期的行为,shareReplay(( 将尝试重新订阅/重新执行源代码
您可以尝试此操作来验证
const a=defer(()=>{
console.log('run')
return throwError(new Error('Error'))
}).pipe(shareReplay())
a.subscribe(console.log,console.error,()=>console.log('complete'))
a.subscribe(console.log,console.error,()=>console.log('complete'))
https://stackblitz.com/edit/typescript-jutfxe
如果您希望可观察共享错误而不再次执行 请改用publishReplay(1),refCount()