如何等待一个可观察到的完成,然后再继续下一个



我开始学习可观察性,我有一些关于如何做以下事情的问题:

  1. 我有一个对象数组
  2. 我想为数组中的每个项使用一个API
  3. 对于每个响应,我想将其组合为一个响应对象数组
  4. 我正在使用一个发送消息的API,所以我希望在发送消息时,它等待继续下一个项目

我正在使用NestJS(typescript(

我已经解决了前3项。

我的问题是,我正在使用一个消息传递API,我制作的代码不等待一个可观察到的响应有效,并且它已经在执行下一个响应,这会影响它无序发送消息。

return of(individualMessages).pipe(
map(messages => messages.map(body =>
this.sendSingleWhatsappMessage(url, body, config, inputQuery)
.pipe( map((resp) => resp)))),
mergeMap(ApiRes => forkJoin(ApiRes)), 
);

例如,我有这个阵列

const messages = [ { id: "9r8f", body: "Hello"},{ id: "9r8f", body: "Good morning"}]

对于数组中的每一项,我都要使用消息API

API响应是一个id为的状态代码

{ statusCode: 200, id="87438fsdfhsd"}

我希望它返回数组中组合的每个消耗的响应

[{ statusCode: 200, id="87438fsdfhsd"}, { statusCode: 200, id="72448fsd66hsd"}]

我这样做是正确的,我的问题是Api可能需要很长时间来发送消息,当这种情况发生时,消息会在中无序显示

示例:

  1. 早上好=>消息2
  2. 你好=>消息1

您的原始代码简化为:

return of(individualMessages).pipe(
map(messages => messages.map(
({body}) => this.sendSingleWhatsappMessage(url, body, config, inputQuery)
)),
mergeMap(requests => forkJoin(requests)),
);

行为:

  • map将您的Messages数组转换为请求数组
  • forkJoin创建一个单独的可观察对象,它将同时执行所有请求并返回一个结果数组
  • CCD_ 4订阅该内部";forkJoin observable";并发出其结果(结果数组(

基于此,我预计您的结果数组的顺序是正确的,但它们可能不一定按顺序完成(由于它们都是同时发送的,因此无法保证顺序(。


如果你想按顺序执行每个请求,你可以这样做:

return from(individualMessages).pipe(
concatMap(({body}) => this.sendSingleWhatsappMessage(url, body, config, inputQuery)),
toArray()
);

行为:

  • from单独发出每个数组项
  • CCD_ 6将订阅每个请求"0";一次一个";并发出其结果
  • toArray()将收集所有接收到的发射,并在可观测源完成后作为阵列发射

相关内容

最新更新