多次使用forkJoin

  • 本文关键字:forkJoin rxjs
  • 更新时间 :
  • 英文 :


我正在做一个项目,我们的客户端同时生成近500个请求。我使用forkJoin来获取所有的响应作为数组。

但是在40-50个请求之后服务器会阻止请求或者只发送错误。我必须将这500个请求分成10个请求和loop over this chunks array的块,并且必须为每个块调用forkJoin,并将可观察对象转换为Promise。

有没有办法摆脱卡盘上的for loop?

如果我理解对了你的问题,我认为你的情况与此类似

const clientRequestParams = [params1, params2, ..., params500]
const requestAsObservables = clientRequestParams.map(params => {
return myRequest(params)
})
forkJoin(requestAsObservables).subscribe(
responses => {// do something with the array of responses}
)

,问题可能是服务器无法并行加载这么多请求。

如果我的理解是正确的,如果,如您所写,并发请求的限制为10,您可以尝试使用mergeMap操作符同时指定concurrent参数。

因此,一个解决方案可能如下

const clientRequestParams = [params1, params2, ..., params500]
// use the from function from rxjs to create a stream of params
from(clientRequestParams).pipe(
mergeMap(params => {
return myRequest(params)
}, 10) // 10 here is the concurrent parameter which limits the number of 
// concurrent requests on the fly to 10
).subscribe(
responseNotification => {
// do something with the response that you get from one invocation
// of the service in the server
}
)

如果采用此策略,则可以限制并发性,但不能保证响应序列的顺序。换句话说,第二个请求可以在第一个请求返回之前返回。因此,您需要找到某种机制来将响应链接到请求。一种简单的方法是不仅返回来自服务器的响应,还返回用于调用该特定请求的参数。在本例中,代码看起来像这样

const clientRequestParams = [params1, params2, ..., params500]
// use the from function from rxjs to create a stream of params
from(clientRequestParams).pipe(
mergeMap(params => {
return myRequest(params).pipe(
map(resp => {
return {resp, params}
})
)
}, 10) 
).subscribe(
responseNotification => {
// do something with the response that you get from one invocation
// of the service in the server
}
)

使用此实现,您将创建一个流,该流通知从服务器接收到的响应和在特定调用中使用的参数。

您还可以采用其他策略,例如返回响应和表示该响应的序列号,或者其他。

最新更新