我正试图以DanLew在这里描述的方式将Realm与RxJava和Reform一起使用,将输入从Realm和Reform连接起来,但如果我将Realm添加到链中,它会被卡住
Observable.concat(countryStorage.restoreAsObservable(),
networkService.api()
.getCountries()
.doOnNext(countryStorage::save))
.first()
.observeOn(AndroidSchedulers.mainThread())
.subscribe(//never reaching here)
存储
@Override public Observable<List<Country>> restoreAsObservable() {
Realm realm = realmProvider.get();
return realm.where(Country.class)
.findAll()
.asObservable()
.map(countries -> return realm.copyFromRealm(countries))
.first(countries -> return !countries.isEmpty())
.doOnCompleted(realm::close());
}
似乎这可能会发生,从Realm中可以观察到的是热的,但在文档中没有任何内容,我想如何将Realm与其他可观察到的组成?
更新:事实证明,它以旧的方式运行良好。关于新api的问题仍然存在。
return Observable.just(
realm.copyFromRealm(realm.where(Country.class).findAll()))
.filter(countries -> !countries.isEmpty())
.doOnCompleted(realm::close);
之所以会发生这种情况,是因为countryStorage.restoreAsObservable()
从未完成,如果您读取concat文档,它会明确声明:
Concat等待订阅您传递给它的每个附加Observable,直到上一个Observable完成。
相反,你可以做一些类似的事情:
countryStorage.restoreAsObservable()
.doOnSubscribe(() -> {
networkService.api()
.getCountries()
.subscribe(countryStorage::save)
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(//do smth)