如何使用 Realm asObservable 与 RxJava 的 concat() 运算符一起使用?



我正试图以DanLew在这里描述的方式将Realm与RxJava和Reform一起使用,将输入从Realm和Reform连接起来,但如果我将Realm添加到链中,它会被卡住

Observable.concat(countryStorage.restoreAsObservable(),
              networkService.api()
                  .getCountries()
                  .doOnNext(countryStorage::save))
              .first()
              .observeOn(AndroidSchedulers.mainThread())
              .subscribe(//never reaching here)

存储

 @Override public Observable<List<Country>> restoreAsObservable() {
        Realm realm = realmProvider.get();
        return realm.where(Country.class)
            .findAll()
            .asObservable()
            .map(countries -> return realm.copyFromRealm(countries))
            .first(countries -> return !countries.isEmpty())
            .doOnCompleted(realm::close());
      }

似乎这可能会发生,从Realm中可以观察到的是热的,但在文档中没有任何内容,我想如何将Realm与其他可观察到的组成?

更新:事实证明,它以旧的方式运行良好。关于新api的问题仍然存在。

return Observable.just(
        realm.copyFromRealm(realm.where(Country.class).findAll()))
        .filter(countries -> !countries.isEmpty())
        .doOnCompleted(realm::close);

之所以会发生这种情况,是因为countryStorage.restoreAsObservable()从未完成,如果您读取concat文档,它会明确声明:

Concat等待订阅您传递给它的每个附加Observable,直到上一个Observable完成。

相反,你可以做一些类似的事情:

    countryStorage.restoreAsObservable()
          .doOnSubscribe(() -> {
              networkService.api()
                  .getCountries()
                  .subscribe(countryStorage::save)
          })
          .observeOn(AndroidSchedulers.mainThread())
          .subscribe(//do smth)

相关内容

最新更新