我现在正在研究 android kotlin 中的rxjava2
。并尝试编写一个函数来压缩多个可观察量和观察器。但这似乎是一些错误。谁能帮忙? 首先,我尝试编写 zip 2 可观察的,它可以工作。但是当我想将其扩展到 vararg 时,它失败了。
fun <T> ApiSubscribeZip2(observable1: Observable<T>, observable2: Observable<T>, observer: Observer<List<T>>) {
Observable.zip(observable1, observable2, BiFunction<T, T, List<T>> { t1: T, t2: T ->
zipAdd(t1, t2)
})
?.subscribeOn(Schedulers.io())?.unsubscribeOn(Schedulers.io())?.observeOn(AndroidSchedulers.mainThread())
?.subscribe(observer as Observer<in List<T>>)
}
fun <T> ApiSubscribeZipN(vararg observable: Observable<T>?, observer: Observer<List<T>>) {
Observable.zip(observable, Function<T, List<T>> { it ->
zipAdd(it)
})
}
private fun <T> zipAdd(vararg observableType: T): List<T> {
val list = ArrayList<T>()
for (ob in observableType) {
list.add(ob)
}
return list
}
apisubscribezipN 显示以下函数都不能使用提供的参数调用。
你可以像这样使用Observable.zipIterable
:
fun <T> ApiSubscribeZipN(vararg observable: Observable<T>?, observer: Observer<List<T>>) {
Observable.zipIterable<T, List<T>>(observable.filterNotNull().toList(), { it.toList() as List<T>? }, false, 100)
.subscribeOn(Schedulers.io()).unsubscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.io())
.subscribe(observer)
}
这个呢?
import io.reactivex.Observable
import io.reactivex.functions.Function
import org.junit.Test
fun <T> apiSubscribeZipN(vararg observable: Observable<T>?): Observable<List<T>> {
val filterNotNull = observable.filterNotNull()
return Observable.zip(filterNotNull, Function { inArr ->
inArr.map {
it as T
}
})
}
测试
@Test
fun whatever() {
val mergeWith1 = Observable.fromArray("test11", "test12").mergeWith(Observable.never())
val mergeWith2 = Observable.fromArray("test21", "test22").mergeWith(Observable.never())
apiSubscribeZipN(mergeWith1, mergeWith2)
.test()
.assertValues(listOf("test11", "test21"), listOf("test12", "test22"))
}