Android Realm + RxJava -从不正确的线程访问Realm.领域对象只能在创建它们的线程上访问



我正在尝试实现RxJava + Realm + Retrofit + Repository Pattern

这是我的本地实现:

@Override
public Observable<Page> search(@NonNull final String query) {
        return Realm.getDefaultInstance().where(Page.class)
                .equalTo("query", query)
                .findAll()
                .asObservable()
                .cast(Page.class);
    }

这是我的远程实现:

 @Override
 public Observable<Page> search(@NonNull String query) {
        return mWikiServices.search(query).map(new Func1<Result, Page>() {
            @Override
            public Page call(Result result) {
                final List<Page> pages = new ArrayList<>(result.getQuery().getPages().values());
                return pages.get(0);
            }
        });
    }

这是我的repo实现:

 final Observable<Page> localResult = mSearchLocalDataSource.search(query);
 final Observable<Page> remoteResult = mSearchRemoteDataSource.search(query)
                .doOnNext(new Action1<Page>() {
                    @Override
                    public void call(Page page) {
                        //mSearchLocalDataSource.save(query, page);
                        //mResultCache.put(query, page);
                    }
                });
        return Observable.concat(localResult, remoteResult)
                .first()
                .doOnError(new Action1<Throwable>() {
                    @Override
                    public void call(Throwable throwable) {
                        throwable.printStackTrace();
                    }
                });

最后这是我在presenter中的订阅

final Subscription subscription = mSearchRepository.search(this.mQuery)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<Page>() {
                    @Override
                    public void onCompleted() {
                        // Completed
                    }
                    @Override
                    public void onError(Throwable e) {
                        mView.onDefaultMessage(e.getMessage());
                    }
                    @Override
                    public void onNext(Page page) {
                        mView.onDefaultMessage(page.getContent());
                    }
                });
        mCompositeSubscription.add(subscription);

当我运行代码时,我得到这个异常:从不正确的线程访问Realm。Realm对象只能在创建它们的线程上访问。

我在Realm Github repo中尝试了官方解决方案,但没有一个有效。仍然得到这个异常。

我认为我得到这个异常,因为我订阅了一个io线程。在主线程中创建Realm实例。所以我得到了这个异常

是否有实现方案?

谢谢。

经过长时间的研究,我找到了解决办法。

首先让我们记住这个问题:当我订阅一个Schedulars时。io线程,并尝试从领域或改造我得到"realm访问从不正确的线程。领域对象只能在创建它们的线程上访问"

在这种情况下,主要问题是我在主线程中创建了Realm实例,但试图从工作线程访问它。

我们可以在主线程上订阅realm,但这不是一个好的做法。当我使用

realm.where("query",query).findFirstAsync().asObservable();

为例,在Github repo中,我被卡在

处。
Observable.concat(localResult, remoteResult).first();

我的解决方案是什么?

在我们的Repository实现中,我们仍然有两个可观察对象用于远程和本地,如下所示:

final Observable<Page> localResult = mSearchLocalDataSource.search(query);
final Observable<Page> remoteResult = mSearchRemoteDataSource.search(query)
                .doOnNext(new Action1<Page>() {
                    @Override
                    public void call(Page page) {
                        if (page != null) {
                            mSearchLocalDataSource.save(query, page);
                            mResultCache.put(query, page);
                        }
                    }
                });

请注意,当我们从远程获取数据时,我们会保存数据并缓存到内存中。

如果不能从缓存中获取数据,我们仍然连接两个可观察对象。

return Observable.concat(localResult, remoteResult)
                .first()
                .map(new Func1<Page, Page>() {
                    @Override
                    public Page call(Page page) {
                        if (page == null) {
                            throw new NoSuchElementException("No result found!");
                        }
                        return page;
                    }
                });

使用concat,我们尝试从realm获取数据,如果不能,我们尝试从remote获取数据。

这是远程可观察的实现:

@Override
public Observable<Page> search(@NonNull String query) {
        return mWikiServices.search(query).flatMap(new Func1<Result, Observable<Page>>() {
            @Override
            public Observable<Page> call(Result result) {
                final ArrayList<Page> pages = new ArrayList<>(result.getQuery().getPages().values());
                Log.i("data from", "remote");
                return Observable.from(pages).first();
            }
        });
    }

这里是本地源代码实现:

@Override
public Observable<Page> search(@NonNull final String query) {
        return Observable.create(new Observable.OnSubscribe<Page>() {
            @Override
            public void call(Subscriber<? super Page> subscriber) {
                final Realm realm = Realm.getInstance(mRealmConfiguration);
                final Page page = realm.where(Page.class)
                        .equalTo("query", query)
                        .findFirst();
                if (page != null && page.isLoaded() && page.isValid()) {
                    Log.i("data from", "realm");
                    subscriber.onNext(realm.copyFromRealm(page));
                } else {
                    Observable.empty();
                }
                subscriber.onCompleted();
                realm.close();
            }
        });
    }

关键是我创建了一个新的Observable并从realm中获取数据。因此,我们创建领域实例并在同一线程中使用它。(io线程)。我们创建对象的副本来摆脱非法的状态异常。

当我们从realm if null中获取数据时,我们返回一个空的可观察对象,以避免在concat操作中卡住。

如果我们得到的页面是有效的,我们发送给订阅者并完成操作。

下面是我们如何保存从remote到realm的数据:
@Override
public void save(@NonNull String query, @NonNull Page page) {
        final Realm realm = Realm.getInstance(mRealmConfiguration);
        realm.beginTransaction();
        final Page p = realm.createObject(Page.class);
        p.setQuery(query);
        p.setId(page.getId());
        p.setTitle(page.getTitle());
        p.setContent(page.getContent());
        realm.copyToRealmOrUpdate(p);
        realm.commitTransaction();
        realm.close();
    }

下面是示例源代码。https://github.com/savepopulation/wikilight

好运。

从技术上讲,两种解决方案都有效,这是一个问题,正如Viraj Tank所说-"安全集成"与"深度集成"。

但是,适当的深度集成方法应该是从服务中单独下载,并在底层Realm中侦听更改的订阅者。(realmResults.asObservable().subscribe()).



老实说,我忍不住觉得这在概念上有缺陷。

首先,Realm查询在创建时在主线程上执行
@Override
public Observable<Page> search(@NonNull final String query) {
        return Realm.getDefaultInstance().where(Page.class)

创建一个永远不会关闭的Realm实例。

此外,它将asObservable()first()结合使用,这使我想知道为什么您首先通过asObservable()向结果添加更改侦听器,而不是仅仅调用Observable.just(results)

然后,似乎远程数据源获取并将元素添加到Realm并立即显示下载的项,而不是由Realm通过更改侦听器直接管理元素,从而提供自动更新。在这种情况下,我真的不确定Realm在做什么。

无论如何,我最初的猜测是,您可能能够使您的代码与以下行 一起工作
final Observable<Page> localResult = mSearchLocalDataSource.search(query)
                                                           .subscribeOn(AndroidSchedulers.mainThread());
final Observable<Page> remoteResult = mSearchRemoteDataSource.search(query)
            .subscribeOn(Schedulers.io())
            .doOnNext(new Action1<Page>() {
                @Override
                public void call(Page page) {
                    //mSearchLocalDataSource.save(query, page);
                    //mResultCache.put(query, page);
                }
            });

考虑到你似乎不依赖Realm的自动更新功能,你可以考虑使用realm.copyFromRealm(obj)来创建一个可以在线程之间传递的非托管副本。


但实际上,为了正确使用Realm,您应该有两个订阅——从网络到Realm的单个数据流;以及RealmResults<Page>.asObservable()的订阅,它会在网页被网络可观察对象写入时通知你——看看Christian Melchior的帖子。

就我个人而言,我跳过了Realm可观察对象,因为RealmRecyclerViewAdapter处理它。因此,如果你在一个RecyclerView中显示多个元素,那么甚至不需要Realm可观察对象,因为RealmRecylerViewAdapter通过RealmChangeListener管理其自动更新,而不依赖于asObservable()来完成它。



编辑:

在将提问者的存储库分支为https://github.com/Zhuinden/wikilight之后,显然我一直都是对的。简单的零复制解决方案是为本地可观察对象添加subscribeOn(AndroidSchedulers.mainThread())

所以令人惊讶的是,没有太大的变化。

    final Observable<Page> localResult = mSearchLocalDataSource.search(query).filter(new Func1<Page, Boolean>() {
        @Override
        public Boolean call(Page page) {
            return page != null;
        }
    }).subscribeOn(AndroidSchedulers.mainThread());
    final Observable<Page> remoteResult = mSearchRemoteDataSource.search(query).subscribeOn(Schedulers.io())
            .doOnNext(new Action1<Page>() {
                @Override
                public void call(Page page) {
                    if (page != null) {
                        mSearchLocalDataSource.save(query, page);
                      //  mResultCache.put(query, page);
                    }
                }
            });

    return Observable.concat(localResult, remoteResult)
            .first()
            .map(new Func1<Page, Page>() {
                @Override
                public Page call(Page page) {
                    if (page == null) {
                        throw new NoSuchElementException("No result found!");
                    }
                    return page;
                }
            });

但公平地说,最初的解决方案似乎将自动更新从图片中删除,因此解决方案中没有使用RealmChangeListener s, RealmObject.asObservable()也没有使用,因此copyFromRealm()确实更有意义。要使自动更新工作,这个

@Override
public Observable<Page> search(@NonNull final String query) {
    return Observable.create(new Observable.OnSubscribe<Page>() {
        @Override
        public void call(Subscriber<? super Page> subscriber) {
            Realm realm = null;
            try {
                realm = mRealmManager.getRealm();
                final Page page = realm.where(Page.class).equalTo("query", query).findFirst();
                if(page != null && page.isLoaded() && page.isValid()) {
                    Log.i("data from", "realm");
                    subscriber.onNext(page);
                } else {
                    Log.i("data is", "empty");
                    Observable.empty();
                }
                subscriber.onCompleted();
            } finally {
                if(realm != null) {
                    mRealmManager.closeRealm(realm);
                }
            }
        }
    });
}

应该替换为:

@Override
public Observable<Page> search(@NonNull final String query) {
    Realm realm = mRealmManager.getRealm(); // UI thread only!
    final Page page = realm.where(Page.class).equalTo("query", query).findFirst();
    if(page != null) {
        Log.i("data from", "realm");
        return page.asObservable();
    } else {
        Log.i("data is", "empty");
        return Observable.empty();
    }
}

最后,一些额外的架构可以使它更好,但我想我还是睡觉吧。

相关内容

  • 没有找到相关文章

最新更新