RxJS 并行队列,具有并发工作线程和每个请求的处理



我在使用 RxJS 和处理请求数组的正确方法时遇到了问题。 假设我有一个大约 50 个请求的数组,如下所示:

let requestCounter = 0;
function makeRequest(timeToDelay) {
return of('Request Complete!').pipe(delay(timeToDelay));
}
const requestArray = []
for(let i=0;i<25;i++){
requestArray.push(makeRequest(3000)); //3 seconds request
requestArray.push(makeRequest(1000)); //1 second request
}

我的目标是:

  • 并行启动请求
  • 只有 5 个可以运行同一时刻
  • 请求完成后,数组中的下一个请求将启动
  • 当请求完成(成功或错误)时,我需要将我的变量"请求计数器"递增 1(requestCounter++)
  • 当我在队列中的最后一个请求完成时,我需要订阅此事件并处理每个请求结果生成的数组

到目前为止,我最接近这样做的是按照这篇文章中的回应:

具有并发工作线程的 RxJS 并行队列?

问题是我发现了 RxJS,并且示例对我来说太复杂了,我找不到如何处理每个请求的计数器。

希望你能帮助我。 (对不起,英语蹩脚了,这不是我的母语)

编辑: 最终解决方案如下所示:

forkJoinConcurrent<T>(
observables: Observable<T>[],
concurrent: number
): Observable<T[]> {
return from(observables).pipe(
mergeMap((outerValue, outerIndex) => outerValue.pipe(
tap(// my code ),
last(),
catchError(error => of(error)),
map((innerValue, innerIndex) => ({index: outerIndex, value: innerValue})),
), concurrent),
toArray(),
map(a => (a.sort((l, r) => l.index - r.index).map(e => e.value))),
);
}

首先你应该使用 subject 来存储请求队列,看看mergeMap运算符,有一个concurrency参数你可以为最大并发性设置,还有一个index变量来跟踪调用次数

https://www.learnrxjs.io/operators/transformation/mergemap.html

const requestArray=new Subject()
for(let i=0;i<25;i++){
requestArray.next(makeRequest(3000)); //3 seconds request
requestArray.next(makeRequest(1000)); //1 second request
}
requestArray.pipe(
mergeMap((res,index)=>of([res,index]),
res=>res,5),
map((res,index)=>{if(index===25) .... do your thing ; return res;})
).subscribe(console.log)

最新更新