我有一个发出数字的可观察量(整数,双精度,BigDecimal doen'st matter(。我在可观察的合成时间定义了一个阈值。换句话说,阈值在可观察的生命周期内不会改变。
我需要的是过滤掉与阈值内上次通过的值不同的值。或者改写一下:仅当值与上次传递的值不同时,才向下游传递值,使其大于阈值。
示例会更好地解释它:
Source observable values:
[5, 4, 7, 9, 15, 14, 13, 12, 11, 7, 3, 2, 1]
Filetered values with threshold = 3 :
[5, 9, 15, 11, 7, 3]
为了更好地说明它,让我们假设它是温度读数,例如。我只想过滤温度变化超过 3 度的值。第一个值总是传递并作为要比较的初始值。例如,如果第一个温度读数是21摄氏度,并且在一小时内每个后续读数都在[18..24]中,那么这些值都不应该通过下游。但是一旦它越过了这些界限,它就应该被超越并创建新的界限来比较。
问题是:如何仅使用RxJava运算符执行此操作?或者除了在 rx 管道之外存储状态之外没有其他方法(在易失性或原子引用或任何其他同步状态下,细节无关紧要(?
您可以使用scan
和distinctUntilChanged
的组合来仅使用 RX 运算符来实现此目的。
scan
允许您访问上一个和下一个值,允许您与给定的阈值进行比较。
如果未达到阈值,则可以发出以前的值。
然后,distinctUntilChanged
可以用来减少重复排放。
在 Kotlin 中,这看起来像下面这样:
val threshold = 3
val list = listOf(5, 4, 7, 9, 15, 14, 13, 12, 11, 7, 3, 2, 1)
return Observable.fromIterable<Int>(list)
.scan { previous: Int, next: Int ->
val difference = abs(next - previous)
when {
difference > threshold -> next
else -> previous
}
}.distinctUntilChanged()