Hello,我有下面的Json结构,它在UpdateOrders
操作中作为有效载荷提供。实际上,我想迭代reservations
和orders
,调用this.orderApiService.updateOrder
服务并调度UpdateOrderProgress
操作。在UpdateOrderProgress
操作中,我想提供numberOfReservationsUpdated
和totalReservations
const reservationOrders = [
{
reservationNumber: '22763883',
orders: [
{
orderId: 'O12341',
amount: 25
},
{
orderId: 'O45321',
amount: 50
}
]
},
{
reservationNumber: '42345719',
orders: [
{
orderId: 'O12343',
amount: 75
}
]
}
];
我有以下效果来实现这一点,但不幸的是,这种效果不起作用,并引发了一个例外。
@Effect()
updateOrders$ = this.actions$.pipe(
ofType<UpdateOrders>(UpdateOrdersActionType.UPDATE_ORDERS),
filter((action) => !!action.reservationOrders),
exhaustMap((action) => {
return combineLatest(action.reservationOrders.map((x, index) => {
const totalReservations = action.reservationOrders.length;
const numberOfReservationsUpdated = index + 1;
return combineLatest(x.orders.map((order) => {
const orderUpdateRequest: OrderUpdateRequest = {
orderId: order.orderId,
amount: order.amount
};
return this.orderApiService.updateOrder(orderUpdateRequest).pipe(
switchMap(() => [new UpdateOrderProgress(numberOfReservationsUpdated, totalReservations)]),
catchError((message: string) => of(console.info(message))),
);
}))
}))
})
);
我怎样才能做到这一点?我缺少哪些RxJs运算符?
您可以切换到使用merge
和mergeMap
的组合来获得您想要的效果,而不是使用combineLatest
。
以下是您的问题陈述-
- 一个动作触发一个可观察的
- 这需要触发多个可观测值
- 然后,这些可观察性中的每一个都需要触发一些操作(UPDATE_action(
实现这一点的一种方法如下-
const subj = new Subject<number[]>();
const getData$ = (index) => {
return of({
index,
value: 'Some value for ' + index,
}).pipe(delay(index*1000));
};
const source = subj.pipe(
filter((x) => !!x),
exhaustMap((records: number[]) => {
const dataRequests = records.map((r) => getData$(r));
return merge(dataRequests);
}),
mergeMap((obs) => obs)
);
source.subscribe(console.log);
subj.next([3,1,1,4]); // Each of the value in array simulates a call to an endpoint that'll take i*1000 ms to complete
// OUTPUT -
// {index: 1, value: "Some value for 1"}
// {index: 1, value: "Some value for 1"}
// {index: 3, value: "Some value for 3"}
// {index: 4, value: "Some value for 4"}
鉴于以上解释,您的代码需要更改为类似-的内容
const getOrderRequest$ = (order: OrderUpdateRequest, numberOfReservationsUpdated, totalReservations) => {
const orderUpdateRequest: OrderUpdateRequest = {
orderId: order.orderId,
amount: order.amount
};
return this.orderApiService.updateOrder(orderUpdateRequest).pipe(
switchMap(() => new UpdateOrderProgress(numberOfReservationsUpdated, totalReservations)),
catchError((message: string) => of(console.info(message))),
);
}
updateOrders$ = this.actions$.pipe(
ofType<UpdateOrders>(UpdateOrdersActionType.UPDATE_ORDERS),
filter((action) => !!action.reservationOrders),
exhaustMap((action) => {
const reservationOrders = action.reservationOrders;
const totalLen = reservationOrders.length
const allRequests = []
reservationOrders.forEach((r, index) => {
r.orders.forEach(order => {
const req = getOrderRequest$(order, index + 1, totalLen);
allRequests.push(req);
});
});
return merge(allRequests)
}),
mergeMap(obs=> obs)
);
旁注-虽然示例中的嵌套可观察性可能会起作用,但由于http调用的固有性质需要未知的时间才能完成,因此您可能会看到错误的结果。这意味着,从你写的方式来看,在某些情况下,你可能会看到numberOfReservationsUpdated
并不是更新的实际预订数量的确切指示。
更好的方法是处理减速器中的状态信息。基本上,在UPDATE操作负载中传递reservationNumber
,并让reducer决定有多少请求等待完成。这将是系统状态的准确表示。此外,它将把@effect
中的逻辑简化为单个嵌套的可观察对象,而不是多个嵌套。