Rx distinctUntilChanged允许在事件之间的可配置时间之后重复



让我们考虑一下下面的代码

Rx.Observable.merge(
Rx.Observable.just(1),
Rx.Observable.just(1).delay(1000)
).distinctUntilChanged()
.subscribe(x => console.log(x))

我们预计1只记录一次。然而,如果我们想允许重复一个值,如果它的最后一次发射是在一段时间前可配置的话,该怎么办?我的意思是记录这两个事件

例如,有下面这样的会很酷

Rx.Observable.merge(
Rx.Observable.just(1),
Rx.Observable.just(1).delay(1000)
).distinctUntilChanged(1000)
.subscribe(x => console.log(x))

其中CCD_ 2接受某种超时以允许在下一个元素上重复。然而,这样的事情并不存在,我想知道是否有人知道一种优雅的方法来实现这一点,即使用高级运算符,而不干扰需要处理状态的过滤器

除非我有误解,否则我非常确信这可以通过windowTime:以相对直接的方式实现

Observable
.merge(
Observable.of(1),
Observable.of(1).delay(250), // Ignored
Observable.of(1).delay(700), // Ignored
Observable.of(1).delay(2000),
Observable.of(1).delay(2200), //Ignored
Observable.of(2).delay(2300)
)
// Converts the stream into a stream of streams each 1000 milliseconds long
.windowTime(1000)
// Flatten each of the streams and emit only the latest (there should only be one active 
// at a time anyway
// We apply the distinctUntilChanged to the windows before flattening
.switchMap(source => source.distinctUntilChanged())  
.timeInterval()
.subscribe(
value => console.log(value),
error => console.log('error: ' + error),
() => console.log('complete')
);

请参阅此处的示例(借用@martin的示例输入)

这是一个有趣的用例。我想知道是否有比我更简单的解决方案(注意,我使用的是RxJS 5):

let timedDistinctUntil = Observable.defer(() => {
let innerObs = null;
let innerSubject = null;
let delaySub = null;
function tearDown() {
delaySub.unsubscribe();
innerSubject.complete();
}
return Observable
.merge(
Observable.of(1),
Observable.of(1).delay(250),  // ignored
Observable.of(1).delay(700),  // ignored
Observable.of(1).delay(2000),
Observable.of(1).delay(2200), // ignored
Observable.of(2).delay(2300)
)
.do(undefined, undefined, () => tearDown())
.map(value => {
if (innerObs) {
innerSubject.next(value);
return null;
}
innerSubject = new BehaviorSubject(value);
delaySub = Observable.of(null).delay(1000).subscribe(() => {
innerObs = null;
});
innerObs = innerSubject.distinctUntilChanged();
return innerObs;
})
// filter out all skipped Observable emissions
.filter(observable => observable)
.switch();
});
timedDistinctUntil
.timestamp()
.subscribe(
value => console.log(value),
error => console.log('error: ' + error),
() => console.log('complete')
);

观看现场演示:https://jsbin.com/sivuxo/5/edit?js,控制台

整个逻辑被封装到Observable.defer()静态方法中,因为它需要一些额外的变量。

这一切是如何运作的:

  1. merge()是项目的来源。

  2. 我使用do()来正确捕获源何时完成,这样我就可以关闭内部计时器并发送正确的完成通知。

  3. map()运算符是最有趣的事情发生的地方。我重新指定它接收到的值,然后如果已经有一个有效的Observable(它是在不到1000ms前创建的=innerObs != null),则返回null。然后,我最终创建了一个新的Subject,在那里我将重新命名所有项目,并返回这个与.distinctUntilChanged()链接的BehaviorSubject。最后,我安排1秒的延迟来设置innerObs = null,这意味着当另一个值到达时,它将返回一个具有新.distinctUntilChanged()的新Observable。

  4. 那么filter()会让我忽略所有返回的null值。这意味着它每秒不会发射一个新的Observable超过一次。

  5. 现在我需要使用所谓的高阶Observable(Observable发射Observable)。因此,我使用switch()运算符,该运算符始终只订阅源发射的最新Observable。在我们的情况下,我们每秒最多只发射一次Observable,这要归功于上面使用的filter()),而这个内部Observable本身可以发射它想要的任意多个值,并且所有值都是将通过CCD_ 18,因此重复项被忽略。

此演示的输出将类似于以下输出:

Timestamp { value: 1, timestamp: 1484670434528 }
Timestamp { value: 1, timestamp: 1484670436475 }
Timestamp { value: 2, timestamp: 1484670436577 }
complete

正如您所看到的,值1被发射两次,具有cca 2s的延迟。然而,由于distinctUntilChanged()1,值2在100ms后顺利通过。

我知道这并不简单,但我希望它对你有意义:)

最新更新