如何在 RxJS 6 和 Angular 6 中出错后保持可观察性



任何人都可以帮助解决每当this.http.get(...)出现错误时this._getReactions$.next()不起作用的情况。我想保持可观察性,以便进行下一个输入。

private _getReactions$: Subject<any> = new Subject();
constructor() {
this._getReactions$
.pipe(
switchMap(() => {
return this.http.get(...)
// http request 
}),
catchError(error => {
console.log(error);
return empty();
})
)
.subscribe(data => {
console.log(data)
//results handling
});
}

onClick() {
this._getReactions$.next();
}

如果可观察的死亡,它称之为错误处理程序,并且它们已关闭,您无法通过它们发送任何内容,这意味着它们已关闭,包括间隔在内的所有上游都已死。

如果我们想活下去怎么办。

屏蔽主观察者链是解决方案,
每当
触发请求时,都会将 catch 放入switchmapswitchmap创建 ajax 可观察的,这次使用catch.
switchmap有一种行为,它说我的来源 还没有完成,所以我真的不在乎孩子是否 完成我会继续前进。

constructor() {
this._getReactions$
.pipe(tap(value => { this.loading = true; return value }),
switchMap(() => {
return this.http.get(...).pipe(
catchError((error) => this.handleError(error)))
// http request
}),
)
.subscribe(data => {
console.log(data)
//results handling
this.error = false;
this.loading = false
});
}
private handleError(error: HttpErrorResponse) {
this.error = true;
console.log(error)
this.loading = false
return empty();

Live Demo

Detailed Info

PS:嵌套在任何flattening运算符中,如mergeMapconcatMapexhaustMap和其他平展运算符也可以。

我已经为所有请求找到了解决此问题的方法

创建一个加载程序文件,其中将执行所有请求

loader.ts

import { Observable, Subject, Subscription, EMPTY } from 'rxjs';
import { catchError, map, switchMap } from 'rxjs/operators';
export class Loader<T1, T> {
private _requestQueue: Subject<T1>;
private _errorQueue: Subject<Error>;
private _resultQueue: Observable<T>;
private _loaded = false;
constructor(loaderFunction: (T1) => Observable<T>) {
this._requestQueue = new Subject<T1>();
this._errorQueue = new Subject<Error>();
this._resultQueue = this._requestQueue.pipe(
switchMap(_ => {
this._loaded = false;
return loaderFunction(_).pipe(
catchError(error => {
this._loaded = true;
this._errorQueue.next(error);
// Returning EMPTY observable won't complete the stream
return EMPTY;
})
);
}),
map(_ => {
this._loaded = true;
return _;
}),
);
}
public load(arg?: T1): void {
this._requestQueue.next(arg);
}
public subscribe(successFn: (T) => any, errorFn?: (error: any) => void, 
completeFn?: () => void): Subscription {

this._errorQueue.subscribe(err => {
errorFn(err);
});
return this._resultQueue.subscribe(successFn, null, completeFn);
}
public complete() {
this._requestQueue.complete();
this._errorQueue.complete();
}

get loaded(): boolean {
return this._loaded;
}
}

在您将执行请求的其他文件中(简单(

export class Component {
readonly loader: Loader<ResponseType, RequestParamType>;
constructor() {
this.loader = new Loader(param => this.http.get(param));
this.loader.subscribe(res => {
// Your stuffs
}, (error) => { 
// Error Handling stuffs
}, () => {
// on Complete stuffs (Optional)
});
}
ngOnInit() {
this.loadData();
}
loadData() { // Call this function whenever you want to refresh the data
this.loader.load(params); // this param will directly passed to the http request
}
}

我在加载器中定义了其他参数,这可以帮助您喜欢加载状态和完成流的选项(在 ngOnDestroy 中(

祝您编码愉快!

你可以使用这个场景,它非常适合我

private _getReactions$: Subject<any> = new Subject();
constructor() {
this.init();
}
private init(){
this._getReactions$
.pipe(
switchMap(() => {
return this.http.get(...)
// http request 
}),
catchError(error => {
console.log(error);
throw error;
})
)
.subscribe(data => {
console.log(data)
//results handling
}, () => this.init());
}

如上所述,一个不错的解决方案是在switchMap()调用捕获错误:

switchMap(() => {
return this.http.get(...).pipe(
catchError((error) => this.handleError(error)))
}),

但是,如果您的pipe()运营商有许多switchMap()呼叫,因此在多种情况下可能会收到错误,则最佳解决方案是再次订阅。

例如:

void subscribeData(): void {
data.pipe(
tap(() => this.isLoading = true),
switchMap(() => this.http.get(...).pipe(
catchError(err => {
// lot of copy-paste logic
})
)),
switchMap(() => this.http.get(...).pipe(
catchError(err => {
// lot of copy-paste logic
})
)),
switchMap(() => this.http.get(...).pipe(
catchError(err => {
// lot of copy-paste logic
})
)),
switchMap(() => this.http.get(...).pipe(
catchError(err => {
// lot of copy-paste logic
})
)),
switchMap(() => this.http.get(...).pipe(
catchError(err => {
// lot of copy-paste logic
})
)),
switchMap(() => this.http.get(...).pipe(
catchError(err => {
// lot of copy-paste logic
})
)),
switchMap(() => this.http.get(...).pipe(
catchError(err => {
// lot of copy-paste logic
})
)),
tap(() => this.isLoading = false, () => this.isLoading = false),
takeUntil(this.destroy)
).subscribe(data => {...})
);

您不需要每次都使用相同的逻辑catchError(...)和复制粘贴。更简单的解决方案是在subcribe(...)调用中的(error) => {...}回调上再次订阅:

void subscribeData(): void {
data.pipe(
tap(() => this.isLoading = true),
switchMap(() => this.http.get(...)),
switchMap(() => this.http.get(...)),
switchMap(() => this.http.get(...)),
switchMap(() => this.http.get(...)),
switchMap(() => this.http.get(...)),
switchMap(() => this.http.get(...)),
switchMap(() => this.http.get(...)),
switchMap(() => this.http.get(...)),
switchMap(() => this.http.get(...)),
tap(() => this.isLoading = false, () => this.isLoading = false),
takeUntil(this.destroy)
).subscribe(data => {...}, err => {
//some other logic
// subscribe again
this.subscribeData();   // <--- subscribe again 
});
}

最新更新