RxJava:数据库和远程服务器



我有一个对象列表,我想从本地数据库(如果可用)检索,或者从远程服务器检索。我正在使用RxJava Observables (SqlBrite用于数据库和Retrofit用于远程服务器)。

我的查询代码如下:

Observable<List<MyObject>> dbObservable = mDatabase
            .createQuery(MyObject.TABLE_NAME,MyObject.SELECT_TYPE_A)
            .mapToList(MyObject.LOCAL_MAPPER);
Observable<List<MyObject>> remoteObservable = mRetrofitService.getMyObjectApiService().getMyObjects();
return Observable.concat(dbObservable, remoteObservable)
    .first(new Func1<List<MyObject>, Boolean>() {
                @Override
                public Boolean call(List<MyObject> myObjects) {
                    return !myObjects.isEmpty();
                }
            });

我看到第一个可观察对象正在运行,并以空列表击中第一个方法,但随后改进的可观察对象没有运行,没有网络请求。如果我切换可观察对象的顺序,或者只是返回远程可观察对象,它会像预期的那样工作,它会到达远程服务器并返回对象列表。

为什么远程可观察对象在这种情况下无法运行?当我先将可观察对象与数据库连接,然后再将可观察对象与数据库连接时,不会调用订阅者的onNext、orError和onComplete方法。

谢谢!

Kaushik Gopal已经在他的RxJava-Android-Samples github项目中解决了这个问题。

他建议使用这种技术:

getFreshNetworkData()
          .publish(network ->
                         Observable.merge(network,
                                          getCachedDiskData().takeUntil(network)))
          .subscribeOn(Schedulers.io())
          .observeOn(AndroidSchedulers.mainThread())
          .subscribe(new Subscriber<List<MyObject>() {
              ...
          });

在您的例子中,它可能看起来像这样:

remoteObservable
      .publish(network ->
                     Observable.merge(network,
                                      dbObservable.takeUntil(network)))
      .first(myObjects -> !myObjects.isEmpty());

编辑:听起来你可能需要这个:

dbObservable
    .flatMap(localResult -> {
        if (localResult.isEmpty()) {
            return remoteObservable;
        } else {
            return Observable.just(localResult);
        }
    });

我假设您有可以从本地和远程获取数据的可观察对象,如下所示:

        final Observable<Page> localResult = mSearchLocalDataSource.search(query);
        final Observable<Page> remoteResult = mSearchRemoteDataSource.search(query)
                .doOnNext(new Action1<Page>() {
                    @Override
                    public void call(Page page) {
                        if (page != null) {
                            mSearchLocalDataSource.save(query, page);
                            mResultCache.put(query, page);
                        }
                    }
                });

然后你可以映射它们并首先get,这意味着如果本地可用,则使用本地,否则使用远程:

        return Observable.concat(localResult, remoteResult)
                .first()
                .map(new Func1<Page, Page>() {
                    @Override
                    public Page call(Page page) {
                        if (page == null) {
                            throw new NoSuchElementException("No result found!");
                        }
                        return page;
                    }
                });

并像下面这样订阅:

mCompositeSubscription.clear();
        final Subscription subscription = mSearchRepository.search(this.mQuery)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<Page>() {
                    @Override
                    public void onCompleted() {
                        // Completed
                    }
                    @Override
                    public void onError(Throwable e) {
                        mView.onDefaultMessage(e.getMessage());
                    }
                    @Override
                    public void onNext(Page page) {
                        mView.onDefaultMessage(page.getContent());
                    }
                });
        mCompositeSubscription.add(subscription);

有关更多细节或示例,您可以查看我的github repo:https://github.com/savepopulation/wikilight

祝你好运!

编辑:

你可以像下面这样尝试一个局部可观察对象。简单地说,它检查是否有记录并返回一个空的可观察对象。

@Override
public Observable<Page> search(@NonNull final String query) {
    return Observable.create(new Observable.OnSubscribe<Page>() {
        @Override
        public void call(Subscriber<? super Page> subscriber) {
            final Realm realm = Realm.getInstance(mRealmConfiguration);
            final Page page = realm.where(Page.class)
                    .equalTo("query", query)
                    .findFirst();
            if (page != null && page.isLoaded() && page.isValid()) {
                Log.i("data from", "realm");
                subscriber.onNext(realm.copyFromRealm(page));
            } else {
                Observable.empty();
            }
            subscriber.onCompleted();
            realm.close();
        }
    });
}
编辑2:

当你从本地concat和first返回null将不工作,你的远程将不会被调用,因为null意味着可观察对象返回null,但仍然可以观察。当你返回observable。用concat空,首先这意味着可观察对象不能从本地发出任何东西,所以它可以从远程发出。

相关内容

  • 没有找到相关文章

最新更新