I 实现了一个名为 "FilterByLatestFrom" 的伪运算符作为 kotlin 的扩展函数。
我使用此运算符编写了下一个代码:
fun testFilterByLatestFromOperator(){
val observableA : Observable<Int> = Observable.fromArray(1,2,3,4,5,6,7,8,9,10)
val observableC : PublishSubject<Int> = PublishSubject.create()
val observableB : Observable<Int> = Observable.just(2).mergeWith(observableC)
observableB.subscribe { println("observableB onNext: $it") }
observableA
.subscribe({ println("Original : $it")})
observableA.filterByLatestFrom(observableB, BiFunction { aVal, bVal -> aVal%bVal==0 })
.subscribe({ println("Result A : $it") })
observableC.onNext(3)
observableA.filterByLatestFrom(observableB, BiFunction { aVal, bVal -> aVal%bVal==0 })
.subscribe({ println("Result AC : $it") })
}
输出为:
observableB onNext: 2
Original : 1
Original : 2
Original : 3
Original : 4
Original : 5
Original : 6
Original : 7
Original : 8
Original : 9
Original : 10
Result A : 2
Result A : 4
Result A : 6
Result A : 8
Result A : 10
observableB onNext: 3
Result AC : 2
Result AC : 4
Result AC : 6
Result AC : 8
Result AC : 10
我希望过滤器操作员将根据可观察 B 的最后一个值过滤 obsA。它适用于第一个块,但是当我使用新值添加 On-next 时,它不会改变结果(使用原始可观察量的相同最后一个值(。
这是 FilterByLatestFrom impl(它的设计也是从 Java 使用的(带有 compose(:
class FilterByLatestFrom<T,U>(private val observable: Observable<T>, private val biFunction: BiFunction<U, T, Boolean>){
fun filter() : ObservableTransformer<U,U> = ObservableTransformer {
it
.withLatestFrom(
observable,
BiFunction<U,T,Pair<U,Boolean>> {
u, t -> Pair(u,biFunction.apply(u,t))
})
.filter { it.second }
.map { it.first }
}
}
fun <T,U> Observable<U>.filterByLatestFrom(observable: Observable<T>, biFunction: BiFunction<U, T, Boolean>) : Observable<U> =
this.compose(FilterByLatestFrom(observable,biFunction).filter())
我错过了什么?
编辑:我想我发现了问题:发布主题应该是行为主题。 并且合并函数应该是 conacat 以保证 obsC 将在 obsB 之后发出。
您的伪运算符filterByLatestFrom
很好,问题出在测试中,PublishSubject
只会发出后续项目,因此当您在上次订阅("结果 AC"(中时,observableB
只会发出 2,因为observableC
已经发出了 3,并且不会将其重播到observableB
(使用 merge
(。
只需将observableC.onNext(3)
移动到最后一个订阅(最后一行(之后,您应该会看到预期的行为。
编辑:也更改为PublishSubject
,就像您解决了相同的问题一样(主题会将最后一个值重播到新订阅(