当使用 rxjs 6 在 angular 6 中有新请求时,如何取消正在进行的 HTTP 请求



我的应用程序中有以下两个HTTP服务RxnsSearchServiceRxnsSearchHitCountService

用CCD_ 3处理两个请求,如下代码所示。

constructor(
private rxnsSearchService: RxnsSearchService,
private rxnsSearchHitCountService: RxnsSearchHitCountService
) { }
const rxnsObservable: Observable<Array<any>> = this.rxnsSearchService.getReactions(this.searchParams, filters);
const headCountObservable: Observable<number> = this.rxnsSearchHitCountService.getHitCount(this.searchParams, filters);
forkJoin([rxnsObservable, headCountObservable]).pipe().subscribe((results) => { //handling results 
},
error => {
console.log(error);
});

每当有新的请求时,我想取消正在进行的旧请求。有人能帮我吗?

export class RxnsSearchService {
sub: Subject<any> = new Subject();
constructor(private httpClient: HttpClient) {}
getReactions(params: Params, offset: number, perPage: number, filters: any) {
const body = {
filters: filters,
query: params.query
};
return this.httpClient.post(environment.rxnsSearch, body).pipe(
map((response: Array<any>) => {
return response;
}),
catchError(error => {
console.log(error);
return throwError(error);
})
);
}
}

export class RxnsSearchHitCountService {
constructor(private httpClient: HttpClient) {}
getHitCount(params: Params, filters: any) {
const body = {
filters: filters,
query: params.query,
};
return this.httpClient.post(environment.rxnsSearchHitCount, body).pipe(
map((response: number) => {
return response;
}),
catchError(error => {
console.log(error);
return throwError(error);
})
);
}
}

我将通过一个简化的例子来介绍如何做到这一点的一般方法。假设我们目前有这个:

public getReactions() {
this.http.get(…)
.subscribe(reactions => this.reactions = reactions);
}

确保旧请求被取消的方法是在某些主题上发射:

private reactionsTrigger$ = new Subject<void>();
public getReactions() {
this.reactionsTrigger$.next();
}

现在我们有了一个可观察的表示触发新请求的事件流的视图。您现在可以将OnInit实现为这样的东西:

public ngOnInit() {
this.reactionsTrigger$.pipe(
// Use this line if you want to load reactions once initially
// Otherwise, just remove it
startWith(undefined),
// We switchMap the trigger stream to the request
// Due to how switchMap works, if a previous request is still
// running, it will be cancelled.
switchMap(() => this.http.get(…)),
// We have to remember to ensure that we'll unsubscribe from
// this when the component is destroyed
takeUntil(this.destroy$),
).subscribe(reactions => this.reactions = reactions);
}
// Just in case you're unfamiliar with it, this is how you create
// an observable for when the component is destroyed. This helps
// us to unsubscribe properly in the code above
private destroy$ = new Subject<void>();
public ngOnDestroy() {
this.destroy$.next();
this.destroy$.complete();
}

线

switchMap(() => this.http.get(…)),

在您的情况下,可能会将事件切换到forkJoin:

switchMap(() => forkJoin([rxnsObservable, headCountObservable])),

如果您希望单个事件流重新触发两个请求。

查看显示HTTP请求的实际触发器的代码片段会很有帮助,但它很可能是一个在单击时调用函数的UI组件。

使用RxJS 6解决此问题的方法是使用Subject接收点击事件,然后使用switchMap运算符取消未完成的请求以防止背压。这里有一个例子:

private clickSubject$: Subject<void> = new Subject();
constructor() {
this.clickSubject$
.pipe(switchMap(() => forkJoin([rxnsObservable, headCountObservable])))
.subscribe((results) => // handling)
}
onClick() {
this.clickSubject$.next(undefined);
}

如果您有多个位置要执行http请求,则使用以下命令向主题发出:this.clickSubject$.next(undefined)

您可以简单地在RxJs:中使用debounce运算符

debounce(1000);

它是中提供的一种方法,用于在发送任何请求之前设置以毫秒为单位的延迟,

该示例仅用一个请求替换1000ms内激发的所有请求。

更多详细信息:

fromEvent(input, 'input').pipe(
map((e: any) => e.target.value),
debounceTime(500),
distinctUntilChanged()
).subscribe(
(data) => {
this.onFilterTable({column: fieldName, data});
}
);

最新更新