RxJS:即使在源代码可观察量上使用"shareReplay()",每个可观察量也会单独执行&quo



我在可观察量(courses$(上使用RxJSshareReplay()运算符在其他两个可观察量(beginnerCourses$advancedCourses$(之间共享可观察流。它工作正常,成功后在两个可观察量之间共享单个 API 调用响应。

但是,当涉及到错误时,这些可观察到的不共享错误,并且在浏览器控制台中可以看到错误被抛出两次。shareReplay()运算符不是也共享错误吗?这是预期的行为吗?

const http$ = createHttpObservable('/api/courses');
const courses$ = http$
.pipe(
map(res => res['payload'] ),
shareReplay(),
catchError(err => {
return throwError(err);
})
);
this.beginnerCourses$ = courses$
.pipe(
map(courses => courses
.filter(course => course.category === 'BEGINNER')));
this.advancedCourses$ = courses$
.pipe(
map(courses => courses
.filter(course => course.category === 'ADVANCED')));
}

我认为这是预期的行为,有点出乎意料的是你得到了 2 个不同的错误。

shareReplay在数据使用者和数据生成者之间放置ReplaySubject。当错误通知到达时,正在使用ReplaySubject将向所有注册的订阅者发送相同的错误通知:

error(err: any) {
if (this.closed) {
throw new ObjectUnsubscribedError();
}
this.hasError = true;
this.thrownError = err;
this.isStopped = true;
const { observers } = this;
const len = observers.length;
const copy = observers.slice();
for (let i = 0; i < len; i++) {
copy[i].error(err);
}
this.observers.length = 0;
}

但是当使用shareReplay时,当发生错误时,当新订户即将订阅时,正在使用的ReplaySubject将被另一个替换。说它正在被替换,这也需要重新订阅源

所以我认为所有订阅者都应该收到相同的错误通知,只要他们已经是ReplaySubject订阅者列表的一部分。否则,当新订阅者进来时,源将被重新订阅。


您可以做的是防止ReplaySubject接收错误通知并允许其订阅者按原样接收它,方法是使用materializedematerialize运算符:

const courses$ = http$
.pipe(
materialize(), // Everything as a `next` notification
map(res => res['payload'] ),
shareReplay(),
dematerialize() // Back to the original event
);

使用这种方法,如果注册订阅者收到错误通知,它将被取消订阅,这意味着它也将从ReplaySubject的订阅者列表中删除。但是ReplaySubject仍然存在,并且不会在后续订阅者中被替换


另外,我认为这是多余的:

catchError(err => throwError(err));

当可观察量抛出错误时,这是预期的行为,shareReplay(( 将尝试重新订阅/重新执行源代码

您可以尝试此操作来验证

const a=defer(()=>{
console.log('run')
return throwError(new Error('Error'))
}).pipe(shareReplay())
a.subscribe(console.log,console.error,()=>console.log('complete'))
a.subscribe(console.log,console.error,()=>console.log('complete'))

https://stackblitz.com/edit/typescript-jutfxe

如果您希望可观察共享错误而不再次执行 请改用publishReplay(1),refCount()

最新更新