背景
我正在我的应用程序中使用Realm。当数据被加载时,它会经历密集的处理,因此处理发生在后台线程上。
使用的编码模式是工作单元模式,Realm只存在于DataManager下的存储库中。这里的想法是,每个存储库都可以有不同的数据库/文件存储解决方案。
我尝试过的
下面是一些类似于我在FooRespository类中的代码的示例。
这里的想法是获得Realm的一个实例,用于查询领域中感兴趣的对象,返回它们并关闭领域实例。请注意,这是同步的,最后将对象从Realm复制到非托管状态。
public Observable<List<Foo>> getFoosById(List<String> fooIds) {
Realm realm = Realm.getInstance(fooRealmConfiguration);
RealmQuery<Foo> findFoosByIdQuery = realm.where(Foo.class);
for(String id : fooIds) {
findFoosByIdQuery.equalTo(Foo.FOO_ID_FIELD_NAME, id);
findFoosByIdQuery.or();
}
return findFoosByIdQuery
.findAll()
.asObservable()
.doOnUnsubscribe(realm::close)
.filter(RealmResults::isLoaded)
.flatMap(foos -> Observable.just(new ArrayList<>(realm.copyFromRealm(foos))));
}
此代码稍后通过RxJava:与重处理代码一起使用
dataManager.getFoosById(foo)
.flatMap(this::processtheFoosInALongRunningProcess)
.subscribeOn(Schedulers.io()) //could be Schedulers.computation() etc
.subscribe(tileChannelSubscriber);
在阅读了这些文档后,我认为以上内容应该有效,因为它不是异步的,因此不需要活套线程。我在同一个线程中获得领域的实例,因此它不会在线程之间传递,对象也不会。
问题
当执行以上操作时,我得到
来自错误线程的领域访问。只能访问领域对象在创建的线程上。
这似乎不对。我唯一能想到的是,Realm实例池正在为我提供一个使用主线程从另一个进程创建的现有实例。
Kay so
return findFoosByIdQuery
.findAll()
.asObservable()
这种情况发生在UI线程上,因为这是您从最初的调用它的地方
.subscribeOn(Schedulers.io())
啊啊,然后你在Schedulers.io()
上修改它们。
不,那不是同一根线!
尽管我不喜欢从零拷贝数据库中复制的方法,但由于滥用realmResults.asObservable()
,您当前的方法充满了问题,因此这里有一个关于您的代码应该是什么的剧透:
public Observable<List<Foo>> getFoosById(List<String> fooIds) {
return Observable.defer(() -> {
try(Realm realm = Realm.getInstance(fooRealmConfiguration)) { //try-finally also works
RealmQuery<Foo> findFoosByIdQuery = realm.where(Foo.class);
for(String id : fooIds) {
findFoosByIdQuery.equalTo(FooFields.ID, id);
findFoosByIdQuery.or(); // please guarantee this works?
}
RealmResults<Foo> results = findFoosByIdQuery.findAll();
return Observable.just(realm.copyFromRealm(results));
}
}).subscribeOn(Schedulers.io());
}
请注意,您正在所有RxJava处理管道之外创建实例。因此,在调用getFoosById()
.时,在主线程上(或您所在的任何线程上(
方法返回Observable并不意味着它在另一个线程上运行。只有由getFoosById()
方法的最后一条语句创建的Observable的处理管道在正确的线程上运行(filter()
、flatMap()
以及调用方完成的所有处理(。
因此,您必须确保getFoosById()
的调用已经在Schedulers.io()
使用的线程上完成。
实现这一点的一种方法是使用Observable.defer()
:
Observable.defer(() -> dataManager.getFoosById(foo))
.flatMap(this::processtheFoosInALongRunningProcess)
.subscribeOn(Schedulers.io()) //could be Schedulers.computation() etc
.subscribe(tileChannelSubscriber);