我有一个热的可观察到的连续发射消息。我需要使用API REST端点(如执行器/messages/pause
(暂停它,并使用其他API REST端点(例如/messages/resume
(继续它。在暂停时间内,我需要允许其他API REST端点发出类似原始可观察/messages/custom
的消息。
可以在该间隔期间暂停主要可观察流(暂停-恢复(,但不能停止观察模拟消息事件,并在恢复后继续/恢复主要可观察的流?
我认为这将是一种方法:
// The 2 sources of events.
const main$ = /* ... */;
const second$ = /* ... */;
// Emits when the `/pause` endpoint is reached.
// `pause.next(true);`
// When the `/resume` endpoint is reached: `pause.next(false)`
const pause = new Subject<boolean>();
const decoratedMain$ = main$.pipe(
withLatestFrom(paused.pipe(startWith(false))),
filter(([mainValue, isPaused]) => !isPaused),
map(([mainValue]) => mainValue),
);
const decoratedSecond$ = second$.pipe(
withLatestFrom(paused),
filter(([secondValue, isPaused]) => isPaused),
map(([secondValue]) => secondValue),
);
merge(decoratedMain$, decoratedSecond$).subscribe();
除非我遗漏了什么,否则上面的代码片段应该实现以下逻辑:
- 当
main$
可观测到发射时,second$
可观测到的事件将不被考虑在内 main$
可以暂停(使用pause.next(true)
(和恢复(使用pause.next(false)
(- 当
main$
暂停时,它的事件被忽略,现在考虑second$
可观测的事件 - 当
main$
恢复时,开关再次发生:second$
的事件被忽略,main$
的事件被考虑
现在让我们看看RxJS的魔法是如何实现这一点的。
正如您可能已经注意到的,主要逻辑围绕着withLatestFrom
和pause
主题。
可观测到的decoratedMain$
持续发射,但其事件是否被忽略取决于pause
的最新值。如果到达了/pause
端点,那么这些事件将被忽略,因为pause
将发出true
。
decoratedSecond$
对称构建。如果pause
的最新值为false
,则忽略其事件。
最后,我认为分享上述方法的一个小变化会很有帮助,只是为了学习目的:
/* ... */
const isMainPaused$ = pause.pipe(filter(Boolean));
const isMainResumed$ = pause.pipe(filter(v => !v));
const decoratedMain$ = main$.pipe(
share({ resetOnRefCountZero: false }),
takeUntil(isMainPaused$),
repeatWhen(completionsSubject => completionsSubject.pipe(mergeMapTo(isMainResumed$)))
);
/* ... */
代码的其余部分与第一种方法相同。在这里,实际上是忽略main$
的事件,因为share
的Subject
实例没有任何订阅者。当pause
发出false
时,将为该Subject
实例创建一个新的订阅者,而无需重新订阅源,这是由于resetOnRefCountZero: false
选项。