我有一个对象列表,我想从本地数据库(如果可用)检索,或者从远程服务器检索。我正在使用RxJava Observables (SqlBrite用于数据库和Retrofit用于远程服务器)。
我的查询代码如下:
Observable<List<MyObject>> dbObservable = mDatabase
.createQuery(MyObject.TABLE_NAME,MyObject.SELECT_TYPE_A)
.mapToList(MyObject.LOCAL_MAPPER);
Observable<List<MyObject>> remoteObservable = mRetrofitService.getMyObjectApiService().getMyObjects();
return Observable.concat(dbObservable, remoteObservable)
.first(new Func1<List<MyObject>, Boolean>() {
@Override
public Boolean call(List<MyObject> myObjects) {
return !myObjects.isEmpty();
}
});
我看到第一个可观察对象正在运行,并以空列表击中第一个方法,但随后改进的可观察对象没有运行,没有网络请求。如果我切换可观察对象的顺序,或者只是返回远程可观察对象,它会像预期的那样工作,它会到达远程服务器并返回对象列表。
为什么远程可观察对象在这种情况下无法运行?当我先将可观察对象与数据库连接,然后再将可观察对象与数据库连接时,不会调用订阅者的onNext、orError和onComplete方法。
谢谢!
Kaushik Gopal已经在他的RxJava-Android-Samples github项目中解决了这个问题。
他建议使用这种技术:
getFreshNetworkData()
.publish(network ->
Observable.merge(network,
getCachedDiskData().takeUntil(network)))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<List<MyObject>() {
...
});
在您的例子中,它可能看起来像这样:
remoteObservable
.publish(network ->
Observable.merge(network,
dbObservable.takeUntil(network)))
.first(myObjects -> !myObjects.isEmpty());
编辑:听起来你可能需要这个:
dbObservable
.flatMap(localResult -> {
if (localResult.isEmpty()) {
return remoteObservable;
} else {
return Observable.just(localResult);
}
});
我假设您有可以从本地和远程获取数据的可观察对象,如下所示:
final Observable<Page> localResult = mSearchLocalDataSource.search(query);
final Observable<Page> remoteResult = mSearchRemoteDataSource.search(query)
.doOnNext(new Action1<Page>() {
@Override
public void call(Page page) {
if (page != null) {
mSearchLocalDataSource.save(query, page);
mResultCache.put(query, page);
}
}
});
然后你可以映射它们并首先get,这意味着如果本地可用,则使用本地,否则使用远程:
return Observable.concat(localResult, remoteResult)
.first()
.map(new Func1<Page, Page>() {
@Override
public Page call(Page page) {
if (page == null) {
throw new NoSuchElementException("No result found!");
}
return page;
}
});
并像下面这样订阅:
mCompositeSubscription.clear();
final Subscription subscription = mSearchRepository.search(this.mQuery)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Page>() {
@Override
public void onCompleted() {
// Completed
}
@Override
public void onError(Throwable e) {
mView.onDefaultMessage(e.getMessage());
}
@Override
public void onNext(Page page) {
mView.onDefaultMessage(page.getContent());
}
});
mCompositeSubscription.add(subscription);
有关更多细节或示例,您可以查看我的github repo:https://github.com/savepopulation/wikilight
祝你好运!
编辑:你可以像下面这样尝试一个局部可观察对象。简单地说,它检查是否有记录并返回一个空的可观察对象。
@Override
public Observable<Page> search(@NonNull final String query) {
return Observable.create(new Observable.OnSubscribe<Page>() {
@Override
public void call(Subscriber<? super Page> subscriber) {
final Realm realm = Realm.getInstance(mRealmConfiguration);
final Page page = realm.where(Page.class)
.equalTo("query", query)
.findFirst();
if (page != null && page.isLoaded() && page.isValid()) {
Log.i("data from", "realm");
subscriber.onNext(realm.copyFromRealm(page));
} else {
Observable.empty();
}
subscriber.onCompleted();
realm.close();
}
});
}
编辑2:当你从本地concat和first返回null将不工作,你的远程将不会被调用,因为null意味着可观察对象返回null,但仍然可以观察。当你返回observable。用concat空,首先这意味着可观察对象不能从本地发出任何东西,所以它可以从远程发出。