我正在尝试实现RxJava + Realm + Retrofit + Repository Pattern
这是我的本地实现:
@Override
public Observable<Page> search(@NonNull final String query) {
return Realm.getDefaultInstance().where(Page.class)
.equalTo("query", query)
.findAll()
.asObservable()
.cast(Page.class);
}
这是我的远程实现:
@Override
public Observable<Page> search(@NonNull String query) {
return mWikiServices.search(query).map(new Func1<Result, Page>() {
@Override
public Page call(Result result) {
final List<Page> pages = new ArrayList<>(result.getQuery().getPages().values());
return pages.get(0);
}
});
}
这是我的repo实现:
final Observable<Page> localResult = mSearchLocalDataSource.search(query);
final Observable<Page> remoteResult = mSearchRemoteDataSource.search(query)
.doOnNext(new Action1<Page>() {
@Override
public void call(Page page) {
//mSearchLocalDataSource.save(query, page);
//mResultCache.put(query, page);
}
});
return Observable.concat(localResult, remoteResult)
.first()
.doOnError(new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
throwable.printStackTrace();
}
});
最后这是我在presenter中的订阅
final Subscription subscription = mSearchRepository.search(this.mQuery)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Page>() {
@Override
public void onCompleted() {
// Completed
}
@Override
public void onError(Throwable e) {
mView.onDefaultMessage(e.getMessage());
}
@Override
public void onNext(Page page) {
mView.onDefaultMessage(page.getContent());
}
});
mCompositeSubscription.add(subscription);
当我运行代码时,我得到这个异常:从不正确的线程访问Realm。Realm对象只能在创建它们的线程上访问。
我在Realm Github repo中尝试了官方解决方案,但没有一个有效。仍然得到这个异常。
我认为我得到这个异常,因为我订阅了一个io线程。在主线程中创建Realm实例。所以我得到了这个异常
是否有实现方案?
谢谢。
首先让我们记住这个问题:当我订阅一个Schedulars时。io线程,并尝试从领域或改造我得到"realm访问从不正确的线程。领域对象只能在创建它们的线程上访问"
在这种情况下,主要问题是我在主线程中创建了Realm实例,但试图从工作线程访问它。
我们可以在主线程上订阅realm,但这不是一个好的做法。当我使用
realm.where("query",query).findFirstAsync().asObservable();
以为例,在Github repo中,我被卡在
处。Observable.concat(localResult, remoteResult).first();
我的解决方案是什么?
在我们的Repository实现中,我们仍然有两个可观察对象用于远程和本地,如下所示:
final Observable<Page> localResult = mSearchLocalDataSource.search(query);
final Observable<Page> remoteResult = mSearchRemoteDataSource.search(query)
.doOnNext(new Action1<Page>() {
@Override
public void call(Page page) {
if (page != null) {
mSearchLocalDataSource.save(query, page);
mResultCache.put(query, page);
}
}
});
请注意,当我们从远程获取数据时,我们会保存数据并缓存到内存中。
如果不能从缓存中获取数据,我们仍然连接两个可观察对象。
return Observable.concat(localResult, remoteResult)
.first()
.map(new Func1<Page, Page>() {
@Override
public Page call(Page page) {
if (page == null) {
throw new NoSuchElementException("No result found!");
}
return page;
}
});
使用concat,我们尝试从realm获取数据,如果不能,我们尝试从remote获取数据。
这是远程可观察的实现:
@Override
public Observable<Page> search(@NonNull String query) {
return mWikiServices.search(query).flatMap(new Func1<Result, Observable<Page>>() {
@Override
public Observable<Page> call(Result result) {
final ArrayList<Page> pages = new ArrayList<>(result.getQuery().getPages().values());
Log.i("data from", "remote");
return Observable.from(pages).first();
}
});
}
这里是本地源代码实现:
@Override
public Observable<Page> search(@NonNull final String query) {
return Observable.create(new Observable.OnSubscribe<Page>() {
@Override
public void call(Subscriber<? super Page> subscriber) {
final Realm realm = Realm.getInstance(mRealmConfiguration);
final Page page = realm.where(Page.class)
.equalTo("query", query)
.findFirst();
if (page != null && page.isLoaded() && page.isValid()) {
Log.i("data from", "realm");
subscriber.onNext(realm.copyFromRealm(page));
} else {
Observable.empty();
}
subscriber.onCompleted();
realm.close();
}
});
}
关键是我创建了一个新的Observable并从realm中获取数据。因此,我们创建领域实例并在同一线程中使用它。(io线程)。我们创建对象的副本来摆脱非法的状态异常。
当我们从realm if null中获取数据时,我们返回一个空的可观察对象,以避免在concat操作中卡住。
如果我们得到的页面是有效的,我们发送给订阅者并完成操作。
下面是我们如何保存从remote到realm的数据:@Override
public void save(@NonNull String query, @NonNull Page page) {
final Realm realm = Realm.getInstance(mRealmConfiguration);
realm.beginTransaction();
final Page p = realm.createObject(Page.class);
p.setQuery(query);
p.setId(page.getId());
p.setTitle(page.getTitle());
p.setContent(page.getContent());
realm.copyToRealmOrUpdate(p);
realm.commitTransaction();
realm.close();
}
下面是示例源代码。https://github.com/savepopulation/wikilight
好运。
从技术上讲,两种解决方案都有效,这是一个问题,正如Viraj Tank所说-"安全集成"与"深度集成"。
但是,适当的深度集成方法应该是从服务中单独下载,并在底层Realm中侦听更改的订阅者。(realmResults.asObservable().subscribe()
).
老实说,我忍不住觉得这在概念上有缺陷。
首先,Realm查询在创建时在主线程上执行@Override
public Observable<Page> search(@NonNull final String query) {
return Realm.getDefaultInstance().where(Page.class)
创建一个永远不会关闭的Realm实例。
此外,它将asObservable()
与first()
结合使用,这使我想知道为什么您首先通过asObservable()
向结果添加更改侦听器,而不是仅仅调用Observable.just(results)
。
然后,似乎远程数据源获取并将元素添加到Realm并立即显示下载的项,而不是由Realm通过更改侦听器直接管理元素,从而提供自动更新。在这种情况下,我真的不确定Realm在做什么。
无论如何,我最初的猜测是,您可能能够使您的代码与以下行 一起工作final Observable<Page> localResult = mSearchLocalDataSource.search(query)
.subscribeOn(AndroidSchedulers.mainThread());
final Observable<Page> remoteResult = mSearchRemoteDataSource.search(query)
.subscribeOn(Schedulers.io())
.doOnNext(new Action1<Page>() {
@Override
public void call(Page page) {
//mSearchLocalDataSource.save(query, page);
//mResultCache.put(query, page);
}
});
考虑到你似乎不依赖Realm的自动更新功能,你可以考虑使用realm.copyFromRealm(obj)
来创建一个可以在线程之间传递的非托管副本。
但实际上,为了正确使用Realm,您应该有两个订阅——从网络到Realm的单个数据流;以及RealmResults<Page>.asObservable()
的订阅,它会在网页被网络可观察对象写入时通知你——看看Christian Melchior的帖子。
就我个人而言,我跳过了Realm可观察对象,因为RealmRecyclerViewAdapter
处理它。因此,如果你在一个RecyclerView中显示多个元素,那么甚至不需要Realm可观察对象,因为RealmRecylerViewAdapter
通过RealmChangeListener
管理其自动更新,而不依赖于asObservable()
来完成它。
编辑:
在将提问者的存储库分支为https://github.com/Zhuinden/wikilight之后,显然我一直都是对的。简单的零复制解决方案是为本地可观察对象添加subscribeOn(AndroidSchedulers.mainThread())
。
所以令人惊讶的是,没有太大的变化。
final Observable<Page> localResult = mSearchLocalDataSource.search(query).filter(new Func1<Page, Boolean>() {
@Override
public Boolean call(Page page) {
return page != null;
}
}).subscribeOn(AndroidSchedulers.mainThread());
final Observable<Page> remoteResult = mSearchRemoteDataSource.search(query).subscribeOn(Schedulers.io())
.doOnNext(new Action1<Page>() {
@Override
public void call(Page page) {
if (page != null) {
mSearchLocalDataSource.save(query, page);
// mResultCache.put(query, page);
}
}
});
return Observable.concat(localResult, remoteResult)
.first()
.map(new Func1<Page, Page>() {
@Override
public Page call(Page page) {
if (page == null) {
throw new NoSuchElementException("No result found!");
}
return page;
}
});
但公平地说,最初的解决方案似乎将自动更新从图片中删除,因此解决方案中没有使用RealmChangeListener
s, RealmObject.asObservable()
也没有使用,因此copyFromRealm()
确实更有意义。要使自动更新工作,这个
@Override
public Observable<Page> search(@NonNull final String query) {
return Observable.create(new Observable.OnSubscribe<Page>() {
@Override
public void call(Subscriber<? super Page> subscriber) {
Realm realm = null;
try {
realm = mRealmManager.getRealm();
final Page page = realm.where(Page.class).equalTo("query", query).findFirst();
if(page != null && page.isLoaded() && page.isValid()) {
Log.i("data from", "realm");
subscriber.onNext(page);
} else {
Log.i("data is", "empty");
Observable.empty();
}
subscriber.onCompleted();
} finally {
if(realm != null) {
mRealmManager.closeRealm(realm);
}
}
}
});
}
应该替换为:
@Override
public Observable<Page> search(@NonNull final String query) {
Realm realm = mRealmManager.getRealm(); // UI thread only!
final Page page = realm.where(Page.class).equalTo("query", query).findFirst();
if(page != null) {
Log.i("data from", "realm");
return page.asObservable();
} else {
Log.i("data is", "empty");
return Observable.empty();
}
}
最后,一些额外的架构可以使它更好,但我想我还是睡觉吧。