rxjs+angular:Web 服务调用中出错后的错误"killing"可观察量



所以,我已经开始玩rxjs了,在收到web服务调用的错误后,我有一个问题是如何保持可观察的实时性。

在显示代码之前,这里是我当前的场景:一个角度组件,它必须加载一个已分页的初始列表,也可以通过更改组合中的项来过滤。为了用rxjs解决这个问题,我考虑合并两个可观测值:一个处理select的更改事件,另一个用于加载更多项。这是我正在使用的代码:

const filtro$ = this.estadoPedido.valueChanges.pipe(
distinctUntilChanged(),
tap(_ => {
this._paginaAtual = 0;
this.existemMais = true;
}),
startWith(this.estadoPedido.value),
map(estado => new DadosPesquisa(this._paginaAtual,
this._elemsPagina,
estado,
false))
);

每当选择更改时,我都会重置全局页面计数器(tap运算符(,并且由于我想要进行初始加载,所以我也使用startWith运算符。最后,我将当前状态转换为一个具有加载值所需的所有值的对象。

我还有一个主题,每当点击加载更多项目按钮时都会使用:

dataRefresh$ = new Subject<DadosPesquisa>();

这两个可观测值被合并,这样我就可以有一个单独的路径来调用我的web服务:

this.pedidosCarregados$ = merge(filtro$, this.dataRefresh$).pipe(
tap(() => this.emChamadaRemota = true),
switchMap(info => forkJoin(
of(info),
this._servicoPedidos.obtemPedidos(this._idInstancia,
info.paginaAtual,
info.elemsPagina,
info.estado)
)),
shareReplay(),
tap(([info, pedidos]) => this.existemMais = pedidos.length === info.elemsPagina),
scan((todosPedidos, info) => !info[0].addPedidosToExisting ?
info[1] :
todosPedidos.concat(info[1]), []),
tap(() => this.emChamadaRemota = false),
catchError(erro => {
this.emChamadaRemota = false;
this.trataErro(erro);
this.existemMais = false;
return of([]);
})
);

简单回顾一下我在这里要做的事情。。。tap用于设置和清除控制等待微调器的字段(emChamadaRemota(,并用于控制是否应显示加载更多按钮(existemMais(。我在switchMap中使用forkJoin,因为我需要通过管道访问有关当前搜索的信息。scan在那里,因为加载更多的项目应该会将这些项目添加到上一个加载的页面。

现在,我还使用了一个拦截器,它负责设置正确的标头,并通过重试策略处理典型错误(401503等(。以下是intercept方法的代码:

intercept(req: HttpRequest<any>, next: HttpHandler): Observable<HttpEvent<any>> {
const headers = this.obtemHeaders();
const requestClonado = req.clone({ headers });
return next.handle(requestClonado).pipe(
retryWhen(this.retryStrategy()),
catchError(err => {
console.error(err);
let msgErro: string;
if(err instanceof HttpErrorResponse && this._servicoAutenticacao.trataErroFimSessao(err)) {
msgErro = "A sua sessão terminou. Vai ser redirecionado para a página de login" ;
}
else if(err.status === 503 ) {
msgErro = "O servidor não devolveu uma resposta válida (503).";
}
else {
msgErro = err.error && err.error.message ? err.error.message : "Ocorreu um erro no servidor.";
}
if(err.status !== 503) {
this._logger.adicionaInfoExcecao(msgErro).subscribe();
}
return throwError(msgErro);
}
));
} 

现在,问题是:如果我在web服务调用中出错,一切都会很好,但我的可观察对象会被"杀死"。。。这是有道理的,因为操作员应该捕捉到错误并"取消订阅"流(至少,这是我从读过的一些文章中理解的(。

我读过一些文章,其中说解决方案是创建一个内部可观察对象,该对象从不抛出,并封装从web服务调用返回的可观察对象。这是要走的路吗?如果是,我可以在拦截器级别执行吗?或者,在出现错误的情况下,我应该简单地重建我的可观察链(但不使用startWith运算符自动启动它(吗?

好吧,经过一些测试,我成功使其工作的唯一方法(不放弃我所拥有的重试/捕获错误传播(是在抛出异常时重建管道。因此,我将创建代码移到了一个方法中:

private setupPipeline(runFirstTime = true) {
const filtro$ = this.estadoPedido.valueChanges.pipe(
distinctUntilChanged(),
tap(_ => {
this._paginaAtual = 0;
this.existemMais = true;
}),
runFirstTime ? startWith(this.estadoPedido.value) : tap(),
map(estado => new DadosPesquisa(this._paginaAtual,
this._elemsPagina,
estado,
false))
);
this.pedidosCarregados$ = merge(filtro$, this.dataRefresh$).pipe( 
//same as before...
catchError(erro => {
this.emChamadaRemota = false;
this.trataErro(erro);
this.existemMais = false;
setTimeout(() => this.setupRxjs(false), 100); // reset pipeline
return of([]);
})
}

该方法是从init方法内部和catchError运算符内部调用的。我相信还有更好的方法,但重新创建管道让我可以按原样重用代码…