RxJava 2.x - 由 PublishSubject 触发并与其他可观察量合并的 Observable 获取平面映



我有一个使用如下所示PublishSubject的分页解决方案:

private val pages: PublishSubject<Int> = PublishSubject.create()
val observable: Observable<List<Data> = pages.hide()
.filter { !inFlight }
.doOnNext { inFlight = true }
.flatMap{
getPage(it) // Returns an Observable
}
.doOnNext(::onNextPage) // inFlight gets reset here

Observable与其他Observable合并并扫描,如下所示:

fun stateObservable(): Observable<SavedState> {
return Observable.merge(listOf(firstPage(),
nextPage(),// The observable listed above
refresh()))
.scan(MyState.initialState(), StateReducer::reduce)
}

基本上,我有一个单向设置,其中每个可观察的更新都MyState借助累加器功能reduce的相关更改。

ViewModel中,这是以直接的方式消费的:

interactor.stateObservable()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeBy(onNext = ::render, onError = Timber::e)
.addTo(subscriptions)

此设置适用于firstPagerefresh(也在PublishSubject的帮助下触发(,但由于某种原因,分页解决方案在flatMap中返回getPageObservable,但随后此页面Observable永远不会被触发/订阅,并且flatMap之后的doOnNext显然也不会被调用。似乎它基本上不想订阅它,我根本不知道为什么。

getPage函数如下所示:

private fun getPage(page: Long): Observable<PartialState<SavedState>> {
return repo.getPage(page).firstOrError().toObservable()
.subscribeOn(Schedulers.io())
.map<PartialState<MyState>> { NextPageLoaded(it) }
.onErrorReturn { NextPageError(it) }
.startWith { NextPageLoading() }
}

存储库中的getPage是在RxJavaInterop的帮助下通过以下方式将 RxJava 1Observable转换为 RxJava2Observable

public io.reactivex.Observable<List<Data>> getPage(long page) {
Observable<List<Data>> observable = getPage(page)
.map(dataList -> {
if(dataList == null){
dataList = new ArrayList<>();
}
return dataList;
});
return RxJavaInterop.toV2Observable(observable);
}

我没有收到任何错误,所以你可以排除它。

我已经在 RxJava 1 中拥有相同的设置,它运行良好,现在当我迁移到 2.x 时,我希望相同的解决方案可以工作,但我完全陷入了这个分页问题,在所有其他情况下设置都按预期工作。

为了能够测试问题,我在 GitHub 上上传了一个示例项目来演示该问题。

有没有RxJava专家知道它可能是什么?:)

谢谢

我发现了一个问题:过度急切地使用{}startWith创建一个 lambda,该 lambda 在nextPageObservable中什么都不做(因此永远不会切换到页面链(。

.startWith { NextPageLoading() }

相反:

.startWith ( NextPageLoading() )

但是,通过此更改,您的代码会在其他地方产生 IAE,因为 null 值:

java.lang.IllegalArgumentException: Parameter specified as non-null is null: method kotlin.jvm.internal.Intrinsics.checkParameterIsNotNull, parameter page
at com.paginationissue.ui.list.PaginationIssueInteractor$1.invoke(Unknown Source:2)
at com.paginationissue.ui.list.PaginationIssueInteractor$1.invoke(PaginationIssueInteractor.kt:15)
at com.paginationissue.paging.PageNumberTokenStrategy.generateNextPageToken(PageNumberTokenStrategy.kt:21)
at com.paginationissue.paging.PageNumberTokenStrategy.generateNextPageToken(PageNumberTokenStrategy.kt:12)
at com.paginationissue.paging.Pager.onNextPage(Pager.kt:92)

最新更新