如何强制流等待 interval() 运算符完成,然后再执行流的其余部分?



我的问题是:我的 Web 应用程序使用可能过期的身份验证令牌。当它们即将过期时,服务器会发送 401 错误,提示浏览器先执行令牌刷新请求。我正在使用retryWhen()运算符来处理这种情况。retryWhen()运算符使用以下代码请求新令牌。

public tokenExpired = ({
maxRetryAttempts = 1
}: {
maxRetryAttempts?: number,
} = {}) => (attempts: Observable<any>) => {
return attempts.pipe(
mergeMap((error, i) => {
const retryAttempt = i + 1;
if((error && error.headers && (!error.headers.get("reason-unauthorized") || !(error.headers.get("reason-unauthorized") === "authentication-token-expires-soon"))) || retryAttempt > maxRetryAttempts) {
return throwError(error);
}
return this.requestNewToken().pipe(
concatMap(refreshStatus => {
if(refreshStatus === TokenRefreshStatus.TOKEN_REFRESHED) 
return timer(0);
else if(refreshStatus === TokenRefreshStatus.TOKEN_REFRESH_FAILED) 
return throwError(error);
else if(refreshStatus === TokenRefreshStatus.AWAIT_REFRESH && this.loggedIn.value){
let maxWaitTimeInMilliSeconds = 4000; 
interval(100).pipe(takeWhile(() => !(this.tokenRefreshStatus !== TokenRefreshStatus.AWAIT_REFRESH || maxWaitTimeInMilliSeconds <=0)))
.subscribe(value => {maxWaitTimeInMilliSeconds -= 100;});
if(this.tokenRefreshStatus === TokenRefreshStatus.TOKEN_REFRESHED)
return timer(0); 
else return throwError(error);
}
else 
return throwError(error); 
}));
})
);
}

因此,retryWhen 运算符将tokenExpired作为其 lambdaretryWhen(tokenExpired())。由于几乎可以同时触发多个请求,因此我以这样一种方式编写代码:只有第一个请求将通过this.requestNewToken()触发令牌刷新http请求,并且通过retryWhen进行类似调用的任何其他请求都将从requestNewToken()方法获得AWAIT_REFRESH消息。这样,我可以防止对新令牌的多个请求。

收到AWAIT_REFRESH时,代码必须每 100 毫秒检查一次tokenRefreshStatus是否已更改为TOKEN_REFRESHEDTOKEN_REFRESH_FAILED。它现在可以重复此操作的时间超过大约 4000 毫秒。因此,当间隔达到 0 或更低时,间隔也必须停止重复maxWaitTimeInMilliSeconds。之后有一个 if 语句检查TokenRefreshStatus是否等于TOKEN_REFRESHED,如果是,它会发回timer(0)如果没有,它会抛出retryWhen运算符收到的原始错误。

我的问题是应用程序不会等待间隔代码完成。它只是直接转到下面的 if 语句,导致它总是抛出错误。我认为我不应该订阅间隔,而是应该将其作为新的可观察对象返回,该可观察对象执行间隔迭代,然后进行检查以查看令牌在间隔完成后是否刷新。我只是不知道我应该怎么做。

谁能指出我正确的方向?谢谢

我的问题是应用程序没有等待间隔代码完成。

发生这种情况是因为间隔创建了一个可观察量,它是异步的。我会尝试使您的刷新状态函数异步以在您希望它等待时调用 await(即:将您的间隔包装在等待它的承诺中(:

concatMap(async refreshStatus => {
.
.
.
function checkit(){
return new Promise((res, rej)=>{
interval(100).pipe(takeWhile(() =>{
if(!(this.tokenRefreshStatus !== TokenRefreshStatus.AWAIT_REFRESH || maxWaitTimeInMilliSeconds <=0)){
return true;
}
else{
res(true);
return false;
}
}))
.subscribe(value => {maxWaitTimeInMilliSeconds -= 100;});
});
}
await checkit();
.
.
.
})

我希望这能帮助您解决问题。

最新更新