Angular 10的forkJoin取消请求



这应该是一个简单的角度专业人士在那里。我有这个简单的方法:

private addMultiple(products: Product[]): void {
let observables: Observable<Product>[] = [];
products.forEach((product: Product) => observables.push(this.productService.create(product)));
forkJoin(observables).subscribe();
}

如果我发送一个产品的方法,它工作得很好,但如果我尝试多个(100),我只是得到取消的请求负载,我不知道为什么。

有人知道我是否应该使用不同的方法吗?


这是我的productService:

import { Inject, Injectable } from '@angular/core';
import { HttpClient } from '@angular/common/http';
import { finalize, map } from 'rxjs/operators';
import { BehaviorSubject, Observable } from 'rxjs';
import { Product, Attempt, RequestOptions } from '../models';
import { HttpServiceConfig, HTTP_SERVICE_CONFIG } from '../configs';
@Injectable({
providedIn: 'root',
})
export class ProductService {
private endpoint: string = 'products';
public items: BehaviorSubject<Product[]>;
public loading: BehaviorSubject<boolean>;
constructor(
@Inject(HTTP_SERVICE_CONFIG) public config: HttpServiceConfig,
private httpClient: HttpClient
) {
this.items = new BehaviorSubject<Product[]>([]);
this.loading = new BehaviorSubject<boolean>(false);
}
list(categoryId: number, options?: RequestOptions): Observable<Product[]> {
this.loading.next(true);
return this.httpClient
.get<Attempt<Product[]>>(
`${this.config.apiUrl}/categories/${categoryId}/${this.endpoint}/master`,
options?.getRequestOptions()
)
.pipe(
map((response: Attempt<Product[]>) => {
if (response.failure) return response.result;
this.items.next(response.result);
return response.result;
}),
finalize(() => this.loading.next(false))
);
}
get(id: number, slug: string, options?: RequestOptions): Observable<Product> {
return this.httpClient
.get<Attempt<Product>>(
`${this.config.apiUrl}/${this.endpoint}/${id}?slug=${slug}`,
options?.getRequestOptions()
)
.pipe(
map((response: Attempt<Product>) => {
return response.result;
})
);
}
public create(item: Product, options?: RequestOptions): Observable<Product> {
return this.httpClient
.post<Attempt<Product>>(
`${this.config.apiUrl}/${this.endpoint}`,
item,
options?.getRequestOptions()
)
.pipe(
map((response: Attempt<Product>) => {
if (response.failure) return response.result;
const newItem = response.result;
const items = this.items.value;
items.push(newItem);
this.items.next(items);
return response.result;
})
);
}
public updateSpecification(
item: Product,
options?: RequestOptions
): Observable<Product> {
return this.httpClient
.put<Attempt<Product>>(
`${this.config.apiUrl}/${this.endpoint}/specification`,
item,
options?.getRequestOptions()
)
.pipe(
map((response: Attempt<Product>) => {
if (response.failure) return response.result;
const newItem = response.result;
const items = this.items.value;
this.remove(items, newItem.id);
items.push(newItem);
this.items.next(items);
return response.result;
})
);
}
public approve(item: Product, options?: RequestOptions): Observable<Product> {
return this.httpClient
.put<Attempt<Product>>(
`${this.config.apiUrl}/${this.endpoint}/approve`,
item,
options?.getRequestOptions()
)
.pipe(
map((response: Attempt<Product>) => {
if (response.failure) return response.result;
const newItem = response.result;
const items = this.items.value;
this.remove(items, newItem.id);
items.push(newItem);
this.items.next(items);
return response.result;
})
);
}
public reject(item: Product, options?: RequestOptions): Observable<Product> {
return this.httpClient
.put<Attempt<Product>>(
`${this.config.apiUrl}/${this.endpoint}/reject`,
item,
options?.getRequestOptions()
)
.pipe(
map((response: Attempt<Product>) => {
if (response.failure) return response.result;
const newItem = response.result;
const items = this.items.value;
this.remove(items, newItem.id);
items.push(newItem);
this.items.next(items);
return response.result;
})
);
}
private remove(items: Product[], id: number | string) {
items.forEach((item, i) => {
if (item.id !== id) {
return;
}
items.splice(i, 1);
});
}
}

如果服务器几乎同时受到太多请求的冲击,可能会取消请求,这就是使用具有100个Observables的forkJoin所发生的情况。

forkJoin开始执行;

如果您想控制并行执行的数量,您应该尝试设置concurrency参数的mergeMap。如果您希望将请求与响应相匹配,则此方法可能会稍微复杂一些。

代码可以像这样

// set the number of requests you want to fly concurrently
const concurrency = 10;
// use from to generate an Observable from an array of Products
from(products).pipe(
// use mergeMap to execute the request
mergeMap(product => this.productService.create(product).pipe(
// return an object which contains the result and the product which started the request
map(result => ({result, product}))
), concurrency),  // set the concurrency in mergeMap
// toArray transform the stream into an Array if this is what you want
toArray()
).subscribe(
res => {// do something with res, which is an array of objects containing objects with result and product}
)

重要的是要注意,如果任何createforkJoin期间,请求失败,剩余的请求将被取消。

如果你想确保所有请求不被取消,你需要在每个create上设置catchError

最新更新