我有一个从演示者到存储库的调用链,该存储库返回一个可观察量。这是代码:
主持人:
private fun getCategories() =
compositeDisposable.add(
categoriesUseCase.getCategories()
.timeout(TIMEOUT, TIMEOUT_UNIT)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(this::handleCategories, this::handleCategoriesTimeout)
)
这是用例:
fun getCategories(): Observable<List<Category>> =
repository.getCategories()
.map { it.map { Category(it.id, it.text, it.icon) } }
这是存储库://subject 是 BehaviorSubject.create((
fun getcategories(): Observable<List<DiscoverabilityCategoryElement>> =
Observable.just(storage.getCategories())
.mergeWith { subject.flatMapIterable { it.categories }.publish() }
.subscribeOn(Schedulers.io())
.doOnNext { Logger.d("Data", "next categories $it") }
.filter { it.isPresent }
.map { it.get() }
.take(1)
.doOnSubscribe { Logger.d("Data", "Subcribed categories") }
fun saveApiResult(response: Response) {//This is being called after subscribe
subject.onNext(response.categories)
subject.onComplete()
}
存储方法将始终返回 Optional.empty(((同时我正在开发(
我的问题是,即使看到 subject.onNext 被调用,该值也永远不会出现在演示者身上,我已经调试了一点并且 subject 总是返回 false 到 hasObservables,也许我在某个时候失去了我的观察者?
你为什么在那条线上称publish()
?它返回一个ConnectableObservable
,在调用connect
之前不执行任何操作。但是,这条线上没有任何内容需要共享。
试试这个:
fun getcategories(): Observable<List<DiscoverabilityCategoryElement>> =
Observable.just(storage.getCategories())
.mergeWith { subject.flatMapIterable { it.categories } } // <-------------
.subscribeOn(Schedulers.io())
.doOnNext { Logger.d("Data", "next categories $it") }
.filter { it.isPresent }
.map { it.get() }
.take(1)
.doOnSubscribe { Logger.d("Data", "Subcribed categories") }
解决方案是改变
.mergeWith { subject.flatMapIterable { it.categories }.publish() }
由
.mergeWith(subject.flatMap({ rootElement -> Observable.fromArray(element.categories.toOptional()) }))