我在整个项目中有一些类似的代码,这些代码位于我的ViewModels内部,它订阅了一个RxJava可观察对象subscribedOn scheduler .computation()。我有一个MutableLiveData<Integer> isLoadedLiveData
对象,它发布了一个更新的int标志,将在我的活动中观察到。
基本上,在这个ViewModel
中,如果完成了3个订阅,那么isLoadedLiveData
将等于3,因为每个订阅都会增加isLoadedLiveData
的int标志值。在活动中isLoadedLiveData
的LiveData
观察者中,一旦int值等于3,我就会设置该活动的视图。所以它让我知道ViewModel
数据已经准备好了,每个需要的数据都可以从每个相应的getter返回。我这样做,所以我不需要一堆LiveData对象在我的活动,可以代替只有一个标志,告诉我当我所有的单独的数据被加载。下面是我的ViewModel
中的一段代码:
Disposable disposable1 = this.thingRepository.getThingSingle()
.observeOn(Schedulers.computation())
.subscribe(thing -> {
name = thing.getName();
abbrev = thing.getAbbrev();
stuff = thing.getStuff();
loaded++;
isLoadedLiveData.postValue(loaded);
});
现在我先说我不是很精通java/android并发,所以我不太确定,但我不认为这段代码会给我带来某种问题。但我不能百分之百确定。在某些情况下,这可能会有问题吗?或者代码和线程的顺序不会有问题吗?
像name
,abbrev
和stuff
这样的数据是我的ViewModel
的字段,最终都返回到我的Activity(只是通过简单的getter,没有观察者)。这些数据块总是正确更新和安全访问从我的活动在主线程上,因为int标志被张贴到isLoadedLiveData
总是发生在数据更新后的后台线程。我不完全确定,因为这些值正在后台线程上更新,然后主线程访问它。我对并发不够了解。
这与RxJava没有多大关系,但这就是我在这种情况下处理线程的方式。它更多地与Java线程/Android线程/LiveData有关。我不完全确定LiveData.postValue
是如何工作的,但我假设它被放在主线程的循环器中,以执行我的LiveDataObserver
回调,其中传递的值。无论发生什么情况,isLoadedLiveData.postValue(loaded)
上面设置的值都可以安全访问并正确更新吗?
感谢您的回复!谢谢。
编辑:这里是新的代码:
Disposable disposable1 = this.thingRepository.getThingSingle()
.subscribeOn(Schedulers.computation())
.doOnSuccess(thing -> {
name = thing.getName();
abbrev = thing.getAbbrev();
heavyOperationResult = heavyOpperation(thing);
stuff = thing.getStuff();
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(thing -> {
loaded++;
isLoadedLiveData.setValue(loaded);
});
我添加了.subscribeOn(Schedulers.computation())
,尽管我的存储库返回一个已经在Schedulers.computation()
上订阅的Single,只是为了表示在本示例中订阅了哪个线程。我还添加了heavyOperation(thing)
,只是为了表明我需要它在后台,因为我做一些计算可能花费太长时间的主UI线程。
我不同意这个代码片段,在我看来你是在滥用Rx。
两件事:
-
doOnSuccess
是用来处理副作用的,而不是用来做繁重的计算。 -
不要从Rx流中取出状态,而是将其传递到下游。
name = thing.getName();
语句和doOnSuccess
中的其他类似语句非常危险,因为您可能在不同的线程中有多个流修改相同的状态。
你想要的是将状态传递到下游,并最终在LiveData中发布,然后你观察它的变化。
Disposable disposable = thingRepository.getThingSingle()
.subscribeOn(Schedulers.io())
.flatMap(thing ->
Single.fromCallable(() -> heavyOperation(thing))
.map(heavyOperationResult -> new Pair<>(thing, heavyOperationResult))
.subscribeOn(Schedulers.computation()))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(pair -> {
Thing thing = pair.getValue0();
HeavyOperationResult heavyOperationResult = pair.getValue1();
thingLiveData.setValue(thing);
heavyOperationLiveData.setValue(heavyOperationResult);
});