从合并的 Observable.just 和 Subject 可观察,不发出任何内容



我有一个从演示者到存储库的调用链,该存储库返回一个可观察量。这是代码:

主持人:

private fun getCategories() =
        compositeDisposable.add(
            categoriesUseCase.getCategories()
                .timeout(TIMEOUT, TIMEOUT_UNIT)
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(this::handleCategories, this::handleCategoriesTimeout)
        )

这是用例:

fun getCategories(): Observable<List<Category>> =
        repository.getCategories()
            .map { it.map { Category(it.id, it.text, it.icon) } }

这是存储库://subject 是 BehaviorSubject.create((

    fun getcategories(): Observable<List<DiscoverabilityCategoryElement>> =
            Observable.just(storage.getCategories())
                .mergeWith { subject.flatMapIterable { it.categories }.publish() }
                .subscribeOn(Schedulers.io())
                .doOnNext { Logger.d("Data", "next categories $it") }
                .filter { it.isPresent }
                .map { it.get() }
                .take(1)
                .doOnSubscribe { Logger.d("Data", "Subcribed categories") }

   fun saveApiResult(response: Response) {//This is being called after subscribe
        subject.onNext(response.categories)
        subject.onComplete()
    }

存储方法将始终返回 Optional.empty(((同时我正在开发(

我的问题是,即使看到 subject.onNext 被调用,该值也永远不会出现在演示者身上,我已经调试了一点并且 subject 总是返回 false 到 hasObservables,也许我在某个时候失去了我的观察者?

你为什么在那条线上称publish()?它返回一个ConnectableObservable,在调用connect之前不执行任何操作。但是,这条线上没有任何内容需要共享。

试试这个:

fun getcategories(): Observable<List<DiscoverabilityCategoryElement>> =
        Observable.just(storage.getCategories())
            .mergeWith { subject.flatMapIterable { it.categories } } // <-------------
            .subscribeOn(Schedulers.io())
            .doOnNext { Logger.d("Data", "next categories $it") }
            .filter { it.isPresent }
            .map { it.get() }
            .take(1)
            .doOnSubscribe { Logger.d("Data", "Subcribed categories") }

解决方案是改变

 .mergeWith { subject.flatMapIterable { it.categories }.publish() }

.mergeWith(subject.flatMap({ rootElement -> Observable.fromArray(element.categories.toOptional()) }))

相关内容

最新更新