是否有可能为这个Android LiveData/Threading代码给出某种并发性问题或意外结果?



我在整个项目中有一些类似的代码,这些代码位于我的ViewModels内部,它订阅了一个RxJava可观察对象subscribedOn scheduler .computation()。我有一个MutableLiveData<Integer> isLoadedLiveData对象,它发布了一个更新的int标志,将在我的活动中观察到。

基本上,在这个ViewModel中,如果完成了3个订阅,那么isLoadedLiveData将等于3,因为每个订阅都会增加isLoadedLiveData的int标志值。在活动中isLoadedLiveDataLiveData观察者中,一旦int值等于3,我就会设置该活动的视图。所以它让我知道ViewModel数据已经准备好了,每个需要的数据都可以从每个相应的getter返回。我这样做,所以我不需要一堆LiveData对象在我的活动,可以代替只有一个标志,告诉我当我所有的单独的数据被加载。下面是我的ViewModel中的一段代码:

Disposable disposable1 = this.thingRepository.getThingSingle()
.observeOn(Schedulers.computation())
.subscribe(thing -> {
name = thing.getName();
abbrev = thing.getAbbrev();
stuff = thing.getStuff();
loaded++;
isLoadedLiveData.postValue(loaded);
});

现在我先说我不是很精通java/android并发,所以我不太确定,但我不认为这段代码会给我带来某种问题。但我不能百分之百确定。在某些情况下,这可能会有问题吗?或者代码和线程的顺序不会有问题吗?

name,abbrevstuff这样的数据是我的ViewModel的字段,最终都返回到我的Activity(只是通过简单的getter,没有观察者)。这些数据块总是正确更新和安全访问从我的活动在主线程上,因为int标志被张贴到isLoadedLiveData总是发生在数据更新后的后台线程。我不完全确定,因为这些值正在后台线程上更新,然后主线程访问它。我对并发不够了解。

这与RxJava没有多大关系,但这就是我在这种情况下处理线程的方式。它更多地与Java线程/Android线程/LiveData有关。我不完全确定LiveData.postValue是如何工作的,但我假设它被放在主线程的循环器中,以执行我的LiveDataObserver回调,其中传递的值。无论发生什么情况,isLoadedLiveData.postValue(loaded)上面设置的值都可以安全访问并正确更新吗?

感谢您的回复!谢谢。

编辑:这里是新的代码:

Disposable disposable1 = this.thingRepository.getThingSingle()
.subscribeOn(Schedulers.computation())
.doOnSuccess(thing -> {
name = thing.getName();
abbrev = thing.getAbbrev();
heavyOperationResult = heavyOpperation(thing);
stuff = thing.getStuff();
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(thing -> {
loaded++;
isLoadedLiveData.setValue(loaded);
});

我添加了.subscribeOn(Schedulers.computation()),尽管我的存储库返回一个已经在Schedulers.computation()上订阅的Single,只是为了表示在本示例中订阅了哪个线程。我还添加了heavyOperation(thing),只是为了表明我需要它在后台,因为我做一些计算可能花费太长时间的主UI线程。

我不同意这个代码片段,在我看来你是在滥用Rx。

两件事:

  1. doOnSuccess是用来处理副作用的,而不是用来做繁重的计算。

  2. 不要从Rx流中取出状态,而是将其传递到下游。name = thing.getName();语句和doOnSuccess中的其他类似语句非常危险,因为您可能在不同的线程中有多个流修改相同的状态。

你想要的是将状态传递到下游,并最终在LiveData中发布,然后你观察它的变化。

Disposable disposable = thingRepository.getThingSingle()
.subscribeOn(Schedulers.io())
.flatMap(thing ->
Single.fromCallable(() -> heavyOperation(thing))
.map(heavyOperationResult -> new Pair<>(thing, heavyOperationResult))
.subscribeOn(Schedulers.computation()))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(pair -> {
Thing thing = pair.getValue0();
HeavyOperationResult heavyOperationResult = pair.getValue1();
thingLiveData.setValue(thing);
heavyOperationLiveData.setValue(heavyOperationResult);
});