OnErrorNotImplementedException using RxJava2 and Retrofit2 M



尽管处理了下游的错误(?),但我还是抛出了OnErrorNotImplementException并且应用程序崩溃了。

例外

E/AndroidRuntime: FATAL EXCEPTION: RxCachedThreadScheduler-1
Process: pl.netlandgroup.smartsab, PID: 9920
io.reactivex.exceptions.OnErrorNotImplementedException: HTTP 401 Unauthorized
at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:704)
at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:701)
at io.reactivex.internal.observers.LambdaObserver.onError(LambdaObserver.java:74)
at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onError(ObservableDoOnEach.java:119)
at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onError(ObservableDoOnEach.java:119)
at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeOnObserver.onError(ObservableSubscribeOn.java:63)
at retrofit2.adapter.rxjava2.BodyObservable$BodyObserver.onNext(BodyObservable.java:56)
at retrofit2.adapter.rxjava2.BodyObservable$BodyObserver.onNext(BodyObservable.java:37)
at retrofit2.adapter.rxjava2.CallExecuteObservable.subscribeActual(CallExecuteObservable.java:43)
at io.reactivex.Observable.subscribe(Observable.java:10838)
at retrofit2.adapter.rxjava2.BodyObservable.subscribeActual(BodyObservable.java:34)
at io.reactivex.Observable.subscribe(Observable.java:10838)
at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeTask.run(ObservableSubscribeOn.java:96)
at io.reactivex.Scheduler$DisposeTask.run(Scheduler.java:452)
at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:61)
at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:52)
at java.util.concurrent.FutureTask.run(FutureTask.java:237)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:272)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607)
at java.lang.Thread.run(Thread.java:761)
Caused by: retrofit2.adapter.rxjava2.HttpException: HTTP 401 Unauthorized
at retrofit2.adapter.rxjava2.BodyObservable$BodyObserver.onNext(BodyObservable.java:54)
at retrofit2.adapter.rxjava2.BodyObservable$BodyObserver.onNext(BodyObservable.java:37) 
at retrofit2.adapter.rxjava2.CallExecuteObservable.subscribeActual(CallExecuteObservable.java:43) 
at io.reactivex.Observable.subscribe(Observable.java:10838) 
at retrofit2.adapter.rxjava2.BodyObservable.subscribeActual(BodyObservable.java:34) 
at io.reactivex.Observable.subscribe(Observable.java:10838) 
at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeTask.run(ObservableSubscribeOn.java:96) 
at io.reactivex.Scheduler$DisposeTask.run(Scheduler.java:452) 
at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:61) 
at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:52) 
at java.util.concurrent.FutureTask.run(FutureTask.java:237) 
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:272) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607) 
at java.lang.Thread.run(Thread.java:761) 

改造存储库:

class RetrofitRepository (retrofit: Retrofit) {
val apiService: ApiService = retrofit.create(ApiService::class.java)
var size: Int = 0
fun getMapResponse(pageIndex: Int = 0): Observable<MapResponse> {
return apiService.getMapResponse(pageIndex = pageIndex)
.doOnError { Log.d("error", it.message) }
.doOnNext { Log.d("currThread", Thread.currentThread().name) }
}
fun getItemsFormResponses(): Observable<List<Item>> {
val list = mutableListOf<Observable<List<Item>>>()
val resp0 = getMapResponse()
resp0.subscribe { size = it.totalCount }
var accum = 0
do {
list.add(getMapResponse(accum).map { it.items })
accum++
} while (list.size*200 < size)
return Observable.merge(list)
}
}

交互器观察到以下结果:

class MapInteractor @Inject constructor(private val repository: RetrofitRepository) {
fun getMapItems(): Observable<MapViewState> {
return repository.getItemsFormResponses()
.map {
if(it.isEmpty()) {
return@map MapViewState.EmptyResult()
} else {
val mapItems = it.map { it.toMapItem() }
return@map MapViewState.MapResult(mapItems)
}
}
.doOnNext { Log.d("currThread", Thread.currentThread().name) }
.startWith(MapViewState.Loading())
.onErrorReturn { MapViewState.Error(it) }
}
}

onErrorReturn { MapViewState.Error(it) }正确发出(就在应用程序崩溃之前,我可以看到屏幕上呈现的正确内容)。如何在仍保持 MVI 架构的同时避免此异常?

编辑

dimsuz 提供的答案是正确的解决方案,尽管要实现合并并返回一个 Observable 与所有项目,必须将其修改为:

fun getMapItems(): Observable<List<Item>> {
return getMapResponse().map {
val size = it.totalCount
val list = mutableListOf<Observable<List<Item>>>()
var accum = 0
do {
list.add(getMapResponse(accum++).map { it.items })
} while (list.size*200 < size)
return@map list.zip { it.flatten() }
}.mergeAll()
}

解决方案已经在上一个答案中显示过,但是您获得"OnErrorNotImplementException"的确切原因是您可能只订阅了"onSuccess"消费者。

您应该添加另一个将处理错误事件的使用者。

getMapResponse()
.subscribe ( 
{ size = it.totalCount },
{/* Do something with the error*/}
)

"doOnError"不会阻止您获得异常。

我相信错误是按照以下方式getItemsFromResponse()抛出的:

val resp0 = getMapResponse()
resp0.subscribe { size = it.totalCount }

在这里您订阅,但不处理错误情况。 实际上这段代码是不正确的,因为你将 Rx 链分成两个独立的部分,你不应该这样做。

你应该做的是这样的:

fun getItemsFormResponses(): Observable<List<Item>> {
return getMapResponse().map { resp0 ->
val size = resp0.totalCount
val list = mutableListOf<Observable<List<Item>>>()
var accum = 0
do {
list.add(getMapResponse(accum).map { it.items })
accum++
} while (list.size*200 < size)
return Observable.merge(list)
}
}

即通过用运算符扩展链来提取size,而不是用subscribe()来破坏它。

最新更新