[Rxjs]如何在Rxjs中对多个同步事件进行分组并只触发订阅者一次

  • 本文关键字:Rxjs 一次 事件 同步 rxjs
  • 更新时间 :
  • 英文 :


我有钻石流依赖关系,单个事件转换为多个,然后组合为一个,但会触发多次。

我想将它们分组为单个输出。

我已经找到了一个解决方案,但我不确定这是正确的方法还是反模式的。

如果我的解决方案可能很糟糕,那么实现这一目标的建议方法是什么?

原始

const obs1 = new Subject<number>();
const obs2 = obs1.pipe(map(v => v + 1));
const obs3 = obs1.pipe(map(v => v * 2));
const obs4 = combineLatest(obs1, obs2, obs3);
obs4.subscribe(console.log);
interval(2000)
.pipe(take(3))
.subscribe(obs1);

输出:

(3) [0, 1, 0]
(3) [1, 1, 0]
(3) [1, 2, 0]
(3) [1, 2, 2]
(3) [2, 2, 2]
(3) [2, 3, 2]
(3) [2, 3, 4]

我的解决方案

const obs1 = new Subject<number>();
const obs2 = obs1.pipe(map(v => v + 1));
const obs3 = obs1.pipe(map(v => v * 2));
const obs4 = combineLatest(obs1, obs2, obs3).pipe(
switchMap(v => of(v, asapScheduler)),
);
obs4.subscribe(console.log);
interval(2000)
.pipe(take(3))
.subscribe(obs1);

输出(也预期(:

(3) [0, 1, 0]
(3) [1, 2, 2]
(3) [2, 3, 4]

这个问题被称为Glitches,它实际上是实现反应式编程的一个常见挑战-Glitches

在没有特定前提的情况下,预期行为会出现故障


在处理事件时,应具有处理程序的中间触发器,用户负责解决问题并提供正确的事件序列。

但是,当将数据从模型绑定到视图时,该场景中的反应性完全是因为[任何视图更新都应该由IO效应触发,如用户输入、计时器、请求处理程序],而不是一般性问题。


Mobx通过事务的设计解决了该领域的故障。此设计提供了解决故障的默认行为。

但这只是解决故障问题的一种方法(专门设计用于提供对数据和衍生数据的反应性(,这并不意味着所有故障都应该以相同的方式解决,这取决于实际情况。

IMOcombineLatest具有相同源且同步的多个可观察器是一个"反模式"。很明显,这是可能发生的,有时你真的不知道这些可观测值是否同步。

您的解决方案完全可以,但我在这里的首选解决方案是使用debounceTime(0)。它将做与你的代码完全相同的事情,但更干净的IMO.

const obs1 = new Subject<number>();
const obs2 = obs1.pipe(map(v => v + 1));
const obs3 = obs1.pipe(map(v => v * 2));
const obs4 = combineLatest(obs1, obs2, obs3).pipe(
debounceTime(0)
);
obs4.subscribe(console.log);
interval(2000)
.pipe(take(3))
.subscribe(obs1);

尽管,在此之前,我会想一想,如果combineLatest100%必要的,那么大多数情况都可以通过其他技术来避免。这将是最好的解决方案。

例如,在同一个可观察的范围内进行所有变换:

const obs1 = new Subject<number>();
const obs4 = obs1.pipe(map((v) => [v, v + 1, v * 2])
obs4.subscribe(console.log);

如果你只是转换输入值,为什么不做一些类似的事情呢

interval(2000)
.pipe(
take(3),
map(val => ([val, val + 1, val * 2])),
)
.subscribe(array => console.log(array));

或者,您的真实数据源不仅仅是转换?

或者,如果您需要所有三个来源,并希望组合最新的值,withLatestFrom可能是一个不错的选择,这也会产生您的预期输出:

const obs1 = new Subject<number>();
const obs2 = obs1.pipe(map(v => v + 1));
const obs3 = obs1.pipe(map(v => v * 2));
const obs4 = obs1.pipe(
withLatestFrom(obs2, obs3)
);
obs4.subscribe(console.log);
interval(2000)
.pipe(take(3))
.subscribe(obs1);

最新更新