在管道内使用concatMap两次的更好选择



我想在另一个promise(promise 1(解析后递归执行一个预定义的promise(promise 2(,看看我与外围设备的连接何时返回false,然后我可以做其他事情。我希望能够使用可观察性来做到这一点。这是我到目前为止提出的代码,有两个问题:

1(有没有更好的替代方案,可以在reposition((函数中的代码中使用两次concatMap,

2(有没有更好的方法来实现我的代码来完成我上面描述的

以下代码使用可观察性递归地执行promise 1(仅一次(和promise 2,然后对结果进行处理:

reprovision(){
this.bleMessage = this.prepareMessageService.prepareMessage(
MessageCategory.customAction, CustomActionConstants.reboot, this.customPayload
);
from(this.bleService.bleWrite('custom', this.bleMessage)).pipe(
concatMap(x => interval(1000)),
concatMap(x => this.bleService.checkBleConnection1(this.deviceId)),
takeWhile(x => x, true)).subscribe(
resp => console.log('still connected to peripheral'),
err =>  {
console.log('disconnected from peripheral, do something else');
}
);
}

这是promise 2(this.bleService.checkBleConnection1(的代码,我希望递归执行该代码,直到外围设备与我的移动设备断开连接。这个承诺背后的想法是,只要我的移动设备仍连接到外围设备,它就应该解决,并在连接失败时抛出错误。

checkBleConnection1(deviceId: string){
return BleClient.getConnectedDevices([]) //BleClient.getConnectedDevices([]) returns a promise
.then(resp => {
if (!resp.some(elem => deviceId === elem.deviceId)){
throw new Error('Peripheral connection failed');
}
return true;
})
.catch(error => {
throw error;
});
}

从你的帖子中,我所理解的是

  • 如果找到设备,您希望通过无限调用this.bleService.checkBleConnection1(this.deviceId)以1秒的间隔递归检查
  • 如果找不到设备,您不想检查更多

因此,对于递归调用,Rxjs中有一个名为expand链接的运算符

在返回递归调用的同时添加delay(1000)以实现1秒的间隔

因此,这是我使用expand代替concatMap(x => interval(1000))的版本

使用expand不需要使用takeWhile,所以我将其删除

reprovision(){
this.bleMessage = this.prepareMessageService.prepareMessage(
MessageCategory.customAction, CustomActionConstants.reboot, this.customPayload
);
from(this.bleService.bleWrite('custom', this.bleMessage))
.pipe(
expand((x) => from(this.bleService.checkBleConnection1(this.deviceId)).pipe(delay(1000)))
)
.subscribe({
next: (resp) => console.log('still connected to peripheral'),
error: (err) =>
console.error('disconnected from peripheral, do something else'),
complete: () => console.log('Completed !'),
});
}

我相信这里的目标是在完成第一个承诺后每1秒检查一次连接(附带说明:这被称为轮询,而不是递归,如果你从reprovision()内部调用reprovision()或类似的东西,它将是递归的,你可以递归地轮询,但你不在这里,除非迫不得已,否则通常不想这样做(。

你不能真正摆脱第二个concatMap,因为你必须根据间隔切换到它,你可以像这样分离和清理流:

const pollConnection$ = interval(1000).pipe(
concatMap(i => this.bleService.checkBleConnection1(this.deviceId)),
takeWhile(x => x, true)
);
from(this.bleService.bleWrite('custom', this.bleMessage)).pipe(
concatMap(x => pollConnection$),
).subscribe(
resp => console.log('still connected to peripheral'),
err =>  {
console.log('disconnected from peripheral, do something else');
}
);

但你需要小心像concatMap这样的运营商,因为它可能会产生背压,这对应用程序不利。如果这些请求的响应时间可能超过1秒,那么您将构建一个请求队列,这可能会降低应用程序的性能。更安全的选择是switchMapexhaustMap,这取决于所需的行为

最新更新