带有Realm的RxJava多线程-来自错误线程的Realm访问



背景

我正在我的应用程序中使用Realm。当数据被加载时,它会经历密集的处理,因此处理发生在后台线程上。

使用的编码模式是工作单元模式,Realm只存在于DataManager下的存储库中。这里的想法是,每个存储库都可以有不同的数据库/文件存储解决方案。

我尝试过的

下面是一些类似于我在FooRespository类中的代码的示例。

这里的想法是获得Realm的一个实例,用于查询领域中感兴趣的对象,返回它们并关闭领域实例。请注意,这是同步的,最后将对象从Realm复制到非托管状态。

public Observable<List<Foo>> getFoosById(List<String> fooIds) {
    Realm realm = Realm.getInstance(fooRealmConfiguration);
    RealmQuery<Foo> findFoosByIdQuery = realm.where(Foo.class);
    for(String id : fooIds) {
        findFoosByIdQuery.equalTo(Foo.FOO_ID_FIELD_NAME, id);
        findFoosByIdQuery.or();
    }
    return findFoosByIdQuery
            .findAll()
            .asObservable()
            .doOnUnsubscribe(realm::close)
            .filter(RealmResults::isLoaded)
            .flatMap(foos -> Observable.just(new ArrayList<>(realm.copyFromRealm(foos))));
}

此代码稍后通过RxJava:与重处理代码一起使用

dataManager.getFoosById(foo)
            .flatMap(this::processtheFoosInALongRunningProcess)
            .subscribeOn(Schedulers.io()) //could be Schedulers.computation() etc
            .subscribe(tileChannelSubscriber);

在阅读了这些文档后,我认为以上内容应该有效,因为它不是异步的,因此不需要活套线程。我在同一个线程中获得领域的实例,因此它不会在线程之间传递,对象也不会。

问题

当执行以上操作时,我得到

来自错误线程的领域访问。只能访问领域对象在创建的线程上。

这似乎不对。我唯一能想到的是,Realm实例池正在为我提供一个使用主线程从另一个进程创建的现有实例。

Kay so

return findFoosByIdQuery
        .findAll()
        .asObservable()

这种情况发生在UI线程上,因为这是您从最初的调用它的地方

.subscribeOn(Schedulers.io())

啊啊,然后你在Schedulers.io()上修改它们。

不,那不是同一根线!

尽管我不喜欢从零拷贝数据库中复制的方法,但由于滥用realmResults.asObservable(),您当前的方法充满了问题,因此这里有一个关于您的代码应该是什么的剧透:

public Observable<List<Foo>> getFoosById(List<String> fooIds) {
   return Observable.defer(() -> {
       try(Realm realm = Realm.getInstance(fooRealmConfiguration)) { //try-finally also works
           RealmQuery<Foo> findFoosByIdQuery = realm.where(Foo.class);
           for(String id : fooIds) {
               findFoosByIdQuery.equalTo(FooFields.ID, id);
               findFoosByIdQuery.or(); // please guarantee this works?
           }
           RealmResults<Foo> results = findFoosByIdQuery.findAll();
           return Observable.just(realm.copyFromRealm(results));
       }
   }).subscribeOn(Schedulers.io());
}

请注意,您正在所有RxJava处理管道之外创建实例。因此,在调用getFoosById().时,在主线程上(或您所在的任何线程上(

方法返回Observable并不意味着它在另一个线程上运行。只有由getFoosById()方法的最后一条语句创建的Observable的处理管道在正确的线程上运行(filter()flatMap()以及调用方完成的所有处理(。

因此,您必须确保getFoosById()的调用已经在Schedulers.io()使用的线程上完成。

实现这一点的一种方法是使用Observable.defer():

Observable.defer(() -> dataManager.getFoosById(foo))
            .flatMap(this::processtheFoosInALongRunningProcess)
            .subscribeOn(Schedulers.io()) //could be Schedulers.computation() etc
            .subscribe(tileChannelSubscriber);

相关内容

最新更新