当RxJS observable完成时,我如何执行异步代码



我想在可观察的完成时执行代码。在我的代码中,我执行以下操作:

compact(): Observable<FileManifest> {
return this.loadIndex().pipe(
mergeMap((index) => index.walk()),
map((entry) => entry.manifest),
notUndefined(),
writeAllMessages(this.newPath, ProtoFileManifest),
finalize(async () => {
await Promise.all([
promises.rm(this.journalPath, { force: true }),
promises.rm(this.manifestPath, { force: true }),
]);
await promises.rename(this.newPath, this.manifestPath);
}),
);
}

问题是finalize方法是为同步代码制作的。当我执行如上所述的异步代码时,代码将独立于订阅执行。

我希望这将在处理可观察的资源时执行,但我希望当我订阅时,我总是收到事件。

如何将异步代码放入finalize方法中?

谢谢Ulrich

一种方法是创建三个可观察器,而不是尝试全部创建合一。每个都将在您想要的顺序异步链中组成一个链接制作

为了使基于承诺的可观察器中的副作用是惰性的,我们使用defer。请注意,defer回调的返回值可以是可观察的,也可以是"ObservableInput";,这就是RxJS所称的它知道如何转向的值转换为可观测值。这个价值可以是一个承诺。

({
compact(): Observable<FileManifest> {
const writeToTempManifest$ = this.loadIndex().pipe(
mergeMap((index) => index.walk()),
map((entry) => entry.manifest),
notUndefined(),
writeAllMessages(this.newPath, ProtoFileManifest)
);
const removeOldManifest$ = defer(() =>
Promise.all([
promises.rm(this.journalPath, { force: true }),
promises.rm(this.manifestPath, { force: true }),
])
);
const renameNewManifest$ = defer(() =>
promises.rename(this.newPath, this.manifestPath)
);
return from([
writeToTempManifest$,
removeOldManifest$,
renameNewManifest$,
]).pipe(concatAll());
},
});

请注意,这些可观察性中的每一个都可能发出一些东西(尽管我不熟悉API(。第一个发出writeAllMessages运算符所做的任何操作,而第二个和第三个发出各自promise的解析值。在第二个的情况下,它是来自Promise.all的两元素数组。

如果你想抑制可观察到的发射值,同时保持它打开直到它完成,你可以创建一个这样做的操作符:

const silence = pipe(concatMapTo(EMPTY));

最新更新