RxJS正在等待数组元素完成,但未按预期工作



我期望以下RxJS行为:对于源数组中的每个元素,执行一个需要等待完成的操作(注释掉的部分(,然后在源数组中获取下一个元素,再次等待,以此类推。但我得到的行为是,同时提取源阵列中的所有元素,然后在延迟后再次重试。

import { from, defer, delay, repeat, tap } from 'rxjs';
const source$ = from([1, 2, 3])
const actions$ = source$.pipe(
tap((t) => console.log(t))
// ... action that takes long and needs to be waited for, before going to the next element in source$
)
const timedExecution$ = defer(() => actions$).pipe(
delay(3000),
repeat(3)
)
timedExecution$.subscribe();

我还尝试了另一种方法,使用timer:

import { from, tap, timer } from 'rxjs';
const source$ = from([1, 2, 3])
const actions$ = source$.pipe(
() => timer(0, 3000),
tap((t) => console.log(t))
// actionThatTakesLong() action that takes long and needs to be waited for, before going to the next element in source$
)
actions$.subscribe();

这里,它一次发射一个,但有时actionThatTakesLong()的时间比定时器的任意3000MS值要长,我需要它等待,直到它完成,而不是等待的硬编码值。感谢您提前提供任何提示

您的源Observable是from(),它是一个同步Observable,在订阅时立即发出一个又一个数组项。它不在乎(也不在乎(链中的价值观会发生什么。

delay()将获取每个值并将其延迟一定时间,但它不(也不能(关心之前的值是否已经到达您的观察者。它只需要获取每个值并将其延迟3秒,而无需等待上一次延迟完成,因此在您的情况下,看起来所有值都是同时发出的。

相反,您要做的是添加concatMap()运算符,该运算符将等待嵌套的延迟Observable完成:

from([1, 2, 3])
.pipe(
concatMap(value => of(value).pipe(delay(3000))),
)
.subscribe(...);

仅供参考,你提到的第二个选项与你想象的完全不同:

const actions$ = source$.pipe(
() => timer(0, 3000),
tap(() => ...),
);

这实际上是将源Observablefrom()替换为不同的Observabletimer(0, 3000)。您基本上使用的是用于创建自定义运算符的方法https://rxjs.dev/guide/operators#creating-全新运营商。

最新更新