RxJS订阅只工作一次



在我的Angular应用中,我手动从JSON中验证一些表单数据。

因此,我订阅了一个接收更改事件的Subject:

private _contentChanged = new Subject<IEditorContentChangedEvent>();
contentChanged$: Observable<IEditorContentChangedEvent>;
constructor() {
this.contentChanged$ = this._contentChanged.asObservable();
}
onContentChanged(event: IEditorContentChangedEvent): void {
this._contentChanged.next(event);
}
ngOnInit(): void {
this.contentChanged$
.pipe(
untilDestroyed(this),
debounceTime(1000),
filter(event => Boolean(event)),
map(() => {
this.resource = JSON.parse(this.fhirResourceString) as IFhirResourceType;
console.log(this.resource);
this.parentForm.setValue(this.resource);
return of(event);
}),
catchError((error: Error) => {
...
return EMPTY;
}),
)
.subscribe(() => {
...
});
}

第一次contentChanged$收到一个值时,我看到map()中的日志语句,但如果map中的代码抛出错误,则不是第二次。

我假设这是因为catchError()返回EMPTY。我一直在遵循Medium上的这篇文章之类的指南,但到目前为止还没有运气。

我已经尝试在catchError()处理程序中返回:of(error),但这也不起作用。

如果我只是订阅contentChanged$而没有任何操作符,我看到_contentChanged每次都收到一个新值,所以我的数据源按预期工作。

如果我做了一个不导致catchError被调用的更改,我在map中看到日志语句,所以它必须是catchError关闭流。

我如何保持可观察/订阅活着处理新的contentChanged$值?

正如您的评论setValue抛出错误,该错误正在CathError中被捕获,并且根据可观察行为源流在源流链(管道)中出现任何类型的错误或源流完成后将死亡

为了保持流存活,你需要在内部处理这个错误-

  • 如果任何子可观察对象导致错误,则

switchMap/ConcatMap...(() => child$.pipe(catchError(...))

  • 在普通javascript错误的情况下,用try catch block
  • 包装它

在你的情况下,它是JSON.parse错误,这是一个javascript错误,所以包装它与try catch

map(() => {
this.resource = JSON.parse(this.fhirResourceString) as IFhirResourceType;
console.log(this.resource);
this.parentForm.setValue(this.resource);
return of(event);
}),

用下面的

修改上面的代码段
map(() => {
try {
this.resource = JSON.parse(this.fhirResourceString) as IFhirResourceType;
} catch(err){
console.error(err);
}
console.log(this.resource);
this.parentForm.setValue(this.resource);
return event;
}),

可观察契约

这里是一个概述:link

相关部分:

在发出完整或错误通知后,可观察对象不能再发出任何通知。

这意味着一旦一个可观察对象完成或出错,它就结束了。终端排放/通知。


一个解决办法:

不要让你的可观察对象发出错误。如果你知道你的map中的一些同步代码抛出了一个错误,你可以捕捉它并在那里处理它,这样它就永远不会传播到你的可观察对象中:

try {
this.resource = JSON.parse(/* ... */);
} catch(err){
console.error(err);
}

另一个解决方案

一旦源可观察对象出现错误,只需重新订阅源可观察对象。这是否有效取决于当你首先订阅你的源时产生了什么副作用。

this.contentChanged$.pipe(
untilDestroyed(this),
debounceTime(1000),
filter(event => Boolean(event)),
map(() => {
...
}),
tap({error: (error: Error) => console.error(error) }),
// By default, if contentChanged$ keeps erroring, this will
// keep retrying for forever.
retry()
).subscribe(() => {
...
});

你也可以通过返回catchError

的可观察对象来有条件地重试。
this.contentChanged$.pipe(
untilDestroyed(this),
debounceTime(1000),
filter(event => Boolean(event)),
map(() => {
...
}),
catchError((error: Error, source$) => {
console.error(error)
if (/* retry? */){
return source$;
} else if (/* Do nothing? */) {
return EMPTY;
} else if (/* Emit some string values? */) {
return of("Hello", "World");
} 
// Otherwise rethrow the error!
return throwError(() => error);
})
).subscribe(() => {
...
});

最新更新