NestJS/RxJS - 有没有更简单的方法可以"observe"一次?



我是NestJS&RxJS。我正在努力用惯用的RxJS编写东西,因为这个项目的目的之一是更好地学习它们,而不是试图绕过或破解它们的API。

无论如何,我有一个逻辑,我从OAuth2服务器查找JWKSet。为此,我使用了NestJS HttpService,它返回一个Observable。然后,我使用该observable将结果设置为ReplaySubject。然后,在我的JwtStrategy中,我使用secretOrKeyProvider函数订阅ReplaySubject,以便在每次收到HTTP请求时获得值

现在,我确信这种方法有很多错误,因为我几乎不知道如何使用RxJS。我最大的问题是最后一部分。当我订阅secretOrKeyProvider函数中的ReplaySubject时,我会立即取消订阅。这是因为我想清理剩余的订阅。

总的来说,我觉得订阅并立即取消订阅是非常错误的。我觉得我做错了什么。我正在寻求对这个代码的审查,以了解我可以做些什么改进。

虽然下面的所有代码都有效,但我的目标是被引导更好、更正确地使用RxJS。

@Injectable()
export class JwkService implements OnModuleInit, OnModuleDestroy {
private readonly logger = new Logger(JwkService.name);
readonly key = new ReplaySubject<string>();
private subscription: Subscription;
constructor(
private httpService: HttpService,
private configService: ConfigService
) {}
onModuleInit(): void {
this.subscription = this.httpService
.get(`${this.configService.get<string>(AUTH_SERVER_HOST)}${jwkUri}`)
.pipe(
map((res: AxiosResponse<JwkSet>) => jwkToPem(res.data.keys[0]))
)
.subscribe({
next: (key) => this.key.next(key),
error: (error: Error) => {
this.logger.error(
'CRITICAL ERROR: Unable to load JWKSet',
ajaxErrorHandler(error)
);
}
});
}
onModuleDestroy(): void {
if (this.subscription) {
this.subscription.unsubscribe();
}
}
}
@Injectable()
export class JwtStrategy extends PassportStrategy(Strategy) {
private readonly logger = new Logger(JwtStrategy.name);
constructor(
private readonly jwkService: JwkService
) {
super({
jwtFromRequest: ExtractJwt.fromAuthHeaderAsBearerToken(),
ignoreExpiration: false,
secretOrKeyProvider: (
req: Request,
rawJwt: string,
done: doneFn
) => {
jwkService.key
.subscribe({
next: (value: string) => done(null, value),
error: (error: Error) => {
this.logger.error(
'Error getting JWK key',
ajaxErrorHandler(error)
);
done(error);
}
})
.unsubscribe();
}
});
}
}

如果您不需要unsubscribe,RxJS会为您提供一些管道:

.get(`${this.configService.get<string>(AUTH_SERVER_HOST)}${jwkUri}`)
.pipe(
take(1),
map((res: AxiosResponse<JwkSet>) => jwkToPem(res.data.keys[0]))
)
...

.get(`${this.configService.get<string>(AUTH_SERVER_HOST)}${jwkUri}`)
.pipe(
first(),
map((res: AxiosResponse<JwkSet>) => jwkToPem(res.data.keys[0]))
)
...

如果取第一个值,first()take(1)管道将为您取消订阅。如果在任何情况下都没有价值,那么你可以手动取消订阅,或者只完成主题。

最新更新