暂停并恢复Observable Stream,并且在使用RxJS暂停期间仅从其他流发出自定义消息



我有一个热的可观察到的连续发射消息。我需要使用API REST端点(如执行器/messages/pause(暂停它,并使用其他API REST端点(例如/messages/resume(继续它。在暂停时间内,我需要允许其他API REST端点发出类似原始可观察/messages/custom的消息。

可以在该间隔期间暂停主要可观察流(暂停-恢复(,但不能停止观察模拟消息事件,并在恢复后继续/恢复主要可观察的流?

我认为这将是一种方法:

// The 2 sources of events.
const main$ = /* ... */;
const second$ = /* ... */;
// Emits when the `/pause` endpoint is reached.
// `pause.next(true);`
// When the `/resume` endpoint is reached: `pause.next(false)`
const pause = new Subject<boolean>();
const decoratedMain$ = main$.pipe(
withLatestFrom(paused.pipe(startWith(false))),
filter(([mainValue, isPaused]) => !isPaused),
map(([mainValue]) => mainValue),
);
const decoratedSecond$ = second$.pipe(
withLatestFrom(paused),
filter(([secondValue, isPaused]) => isPaused),
map(([secondValue]) => secondValue),
);
merge(decoratedMain$, decoratedSecond$).subscribe();

除非我遗漏了什么,否则上面的代码片段应该实现以下逻辑:

  • main$可观测到发射时,second$可观测到的事件将不被考虑在内
  • main$可以暂停(使用pause.next(true)(和恢复(使用pause.next(false)(
  • main$暂停时,它的事件被忽略,现在考虑second$可观测的事件
  • main$恢复时,开关再次发生:second$的事件被忽略,main$的事件被考虑

现在让我们看看RxJS的魔法是如何实现这一点的。

正如您可能已经注意到的,主要逻辑围绕着withLatestFrompause主题。

可观测到的decoratedMain$持续发射,但其事件是否被忽略取决于pause的最新值。如果到达了/pause端点,那么这些事件将被忽略,因为pause将发出true

decoratedSecond$对称构建。如果pause的最新值为false,则忽略其事件。


最后,我认为分享上述方法的一个小变化会很有帮助,只是为了学习目的:

/* ... */
const isMainPaused$ = pause.pipe(filter(Boolean));
const isMainResumed$ = pause.pipe(filter(v => !v));
const decoratedMain$ = main$.pipe(
share({ resetOnRefCountZero: false }),
takeUntil(isMainPaused$),
repeatWhen(completionsSubject => completionsSubject.pipe(mergeMapTo(isMainResumed$)))
);
/* ... */

代码的其余部分与第一种方法相同。在这里,实际上是忽略main$的事件,因为shareSubject实例没有任何订阅者。当pause发出false时,将为该Subject实例创建一个新的订阅者,而无需重新订阅源,这是由于resetOnRefCountZero: false选项。

最新更新