我开始学习可观察性,我有一些关于如何做以下事情的问题:
- 我有一个对象数组
- 我想为数组中的每个项使用一个API
- 对于每个响应,我想将其组合为一个响应对象数组
- 我正在使用一个发送消息的API,所以我希望在发送消息时,它等待继续下一个项目
我正在使用NestJS(typescript(
我已经解决了前3项。
我的问题是,我正在使用一个消息传递API,我制作的代码不等待一个可观察到的响应有效,并且它已经在执行下一个响应,这会影响它无序发送消息。
return of(individualMessages).pipe(
map(messages => messages.map(body =>
this.sendSingleWhatsappMessage(url, body, config, inputQuery)
.pipe( map((resp) => resp)))),
mergeMap(ApiRes => forkJoin(ApiRes)),
);
例如,我有这个阵列
const messages = [ { id: "9r8f", body: "Hello"},{ id: "9r8f", body: "Good morning"}]
对于数组中的每一项,我都要使用消息API
API响应是一个id为的状态代码
{ statusCode: 200, id="87438fsdfhsd"}
我希望它返回数组中组合的每个消耗的响应
[{ statusCode: 200, id="87438fsdfhsd"}, { statusCode: 200, id="72448fsd66hsd"}]
我这样做是正确的,我的问题是Api可能需要很长时间来发送消息,当这种情况发生时,消息会在中无序显示
示例:
- 早上好=>消息2
- 你好=>消息1
您的原始代码简化为:
return of(individualMessages).pipe(
map(messages => messages.map(
({body}) => this.sendSingleWhatsappMessage(url, body, config, inputQuery)
)),
mergeMap(requests => forkJoin(requests)),
);
行为:
map
将您的Messages
数组转换为请求数组forkJoin
创建一个单独的可观察对象,它将同时执行所有请求并返回一个结果数组- CCD_ 4订阅该内部";forkJoin observable";并发出其结果(结果数组(
基于此,我预计您的结果数组的顺序是正确的,但它们可能不一定按顺序完成(由于它们都是同时发送的,因此无法保证顺序(。
如果你想按顺序执行每个请求,你可以这样做:
return from(individualMessages).pipe(
concatMap(({body}) => this.sendSingleWhatsappMessage(url, body, config, inputQuery)),
toArray()
);
行为:
from
单独发出每个数组项- CCD_ 6将订阅每个请求"0";一次一个";并发出其结果
toArray()
将收集所有接收到的发射,并在可观测源完成后作为阵列发射