Rxjava + Realm 访问来自不正确的线程



我从 Realm 读取/写入时收到此异常

06-19

09:49:26.352 11404-11404/****** E/ContentValues: loadData: OnError Realm 访问来自不正确的线程。Realm 对象只能在创建它们的线程上访问。 java.lang.IllegalStateException:来自不正确线程的领域访问。Realm 对象只能在创建它们的线程上访问。 at io.realm.BaseRealm.checkIfValid(BaseRealm.java:385) at io.realm.RealmResults.isLoaded(RealmResults.java:115) at io.realm.OrderedRealmCollectionImpl.size(OrderedRealmCollectionImpl.java:307) at io.realm.RealmResults.size(RealmResults.java:60) at java.util.AbstractCollection.isEmpty(AbstractCollection.java:86) at/****** .lambda$loadData$0(SplashPresenter.java:42) 在/****** $$Lambda$1.测试(未知来源) at io.reactivex.internal.operator.observable.ObservableFilter$FilterObserver.onNext(ObservableFilter.java:45) at io.reactivex.observers.SerializedObserver.onNext(SerializedObserver.java:111) at io.reactivex.internal.operator.observable.ObservableDelay$DelayObserver$1.run(ObservableDelay.java:84) at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:59) at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:51) at java.util.concurrent.FutureTask.run(FutureTask.java:237) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:272) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607) at java.lang.Thread.run(Thread.java:761)

这是代码:

mSubscribe = Observable.just(readData())
.delay(DELAY, TimeUnit.SECONDS)
.filter(value -> !value.isEmpty())
.switchIfEmpty(createRequest())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread()).subscribe(data -> {
getView().hideLoading();
writeData(data);
}, 
(throwable -> {
}));

读取数据

private List<CategoryModel> readData() {
Realm defaultInstance = Realm.getDefaultInstance();
List<CategoryModel> title = defaultInstance.where(CategoryModel.class).findAllSorted("title");
defaultInstance.close();
return title;
}

写入数据

private void writeData(List<CategoryModel> categoryModels) {
try {
Realm defaultInstance = Realm.getDefaultInstance();
defaultInstance.executeTransactionAsync(realm -> realm.insertOrUpdate(categoryModels));
defaultInstance.close();
} finally {
getView().notifyActivity(categoryModels);
}
}

如何使用正确的线程遵循此逻辑?

跨线程使用 Realm 的唯一规则是记住 Realm、RealmObject 或 RealmResults 实例不能跨线程传递

当您想要从不同的线程访问相同的数据时,您应该简单地获取一个新的 Realm 实例(即Realm.getDefaultInstance()),并通过查询获取您的对象(然后在线程末尾关闭 Realm)。

这些对象将映射到磁盘上的相同数据,并且可以从任何线程读取和写入!您还可以使用如下所示realm.executeTransactionAsync()在后台线程上运行代码 .

如何使用正确的线程遵循此逻辑?

通过不尝试读取 UI 线程的Schedulers.io()(毕竟,Realm 提供了自动更新延迟加载的代理视图,这些视图会为 UI 线程上的数据提供更改通知)。


所以取而代之的是这个

mSubscribe = Observable.just(readData())
.delay(DELAY, TimeUnit.SECONDS)
.filter(value -> !value.isEmpty())
.switchIfEmpty(createRequest())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread()).subscribe(data -> {
getView().hideLoading();
writeData(data);
}, 
(throwable -> {
}));
private List<CategoryModel> readData() {
Realm defaultInstance = Realm.getDefaultInstance();
List<CategoryModel> title = defaultInstance.where(CategoryModel.class).findAllSorted("title");
defaultInstance.close();
return title;
}
private void writeData(List<CategoryModel> categoryModels) {
try {
Realm defaultInstance = Realm.getDefaultInstance();
defaultInstance.executeTransactionAsync(realm -> realm.insertOrUpdate(categoryModels));
defaultInstance.close();
} finally {
getView().notifyActivity(categoryModels);
}
}

你应该有类似的东西

private Observable<List<CategoryModel>> readData() { // Flowable with LATEST might be better.
return io.reactivex.Observable.create(new ObservableOnSubscribe<List<CategoryModel>>() {
@Override
public void subscribe(ObservableEmitter<List<CategoryModel>> emitter)
throws Exception {
final Realm observableRealm = Realm.getDefaultInstance();
final RealmResults<CategoryModel> results = observableRealm.where(CategoryModel.class).findAllSortedAsync("title");
final RealmChangeListener<RealmResults<CategoryModel>> listener = results -> {
if(!emitter.isDisposed() && results.isLoaded()) {
emitter.onNext(results);
}
};
emitter.setDisposable(Disposables.fromRunnable(() -> {
if(results.isValid()) {
results.removeChangeListener(listener);
}
observableRealm.close();
}));
results.addChangeListener(listener);
}
}).subscribeOn(AndroidSchedulers.mainThread())
.unsubscribeOn(AndroidSchedulers.mainThread());
}
private void setSubscription() {
mSubscribe = readData()
.doOnNext((list) -> {
if(list.isEmpty()) {
Single.fromCallable(() -> this::createRequest)
.subscribeOn(Schedulers.io())
.subscribe((data) -> {
writeData(data);
});
}
}).subscribe(data -> {
if(!data.isEmpty()) {
getView().hideLoading();
getView().notifyActivity(data);
}
}, throwable -> {
throwable.printStackTrace();
});
}
private void writeData(List<CategoryModel> categoryModels) {
try(Realm r = Realm.getDefaultInstance()) {
r.executeTransaction(realm -> realm.insertOrUpdate(categoryModels));
}
}
void unsubscribe() {
mSubscribe.dispose();
mSubscribe = null;
}

这样(如果我没有搞砸任何东西),你最终会得到这里和这里描述的反应式数据层,除了没有映射整个结果的额外开销。

编辑:

从 Realm 4.0 开始,RealmResults 可以直接作为 Flowable 公开(在 UI 线程或后台循环器线程上)。

public Flowable<List<MyObject>> getLiveResults() {
try(Realm realm = Realm.getDefaultInstance()) {
return realm.where(MyObject.class) 
.findAllAsync()
.asFlowable()
.filter(RealmResults::isLoaded);
}
}

您需要将所需数据从领域对象提取到 POJO 中,并使用 map 运算符发出 POJO,以便视图对象可以使用 android 主线程上的 pojo 使用来自领域的数据进行更新。

您只能在事务中操作 Realm 对象,或者只能在您读取/写入这些对象的线程中操作这些对象。 在你的例子中,你从 readData 方法得到一个 RealmResult ,而使用 RxJava 时,你正在切换导致异常的线程。使用 copyFromRealm 从 realm 获取数据,这些数据将作为普通对象而不是 realm 对象返回。

最新更新