大家好,我的BaseActivity中有以下函数。
override fun <T> subscribeToInternet(observable: Observable<Response<BaseResponse<T>>>, observer: Observer<BaseResponse<T>>) {
observable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnSubscribe { observer.onSubscribe(it) }
.doOnError {
Log.d(TAG, it.message)
observer.onError(it)
}
.doOnComplete { observer.onComplete() }
.doOnNext {
Log.d(TAG, "${it.body() ?: "no body"}")
Log.d(TAG, "${it.errorBody()?.string() ?: "no error body"}")
Log.d(TAG, it.code().toString())
when {
it.code() == 401 -> {
view.userUnauthenticated()
observer.onNext(BaseResponse(false, "unauthenticated", null))
Log.d(TAG, "UNAUTHENTICATED")
}
it.code() == 423 -> {
view.userBlocked()
observer.onNext(BaseResponse(false, "blocked", null))
Log.d(TAG, "BLOCKED")
}
it.isSuccessful -> observer.onNext(it.body()!!)
it.code() == 429 -> observer.onNext(BaseResponse(false, "Too many attempts", null))
it.code() == 400 -> observer.onNext(BaseResponse(false, "Invalid Email or password", null))
else -> observer.onNext(BaseResponse(false, "", null))
}
}
.subscribe()
}
如果服务器返回响应,我会处理观察者的 onNext(( 中的错误,但是当设备上根本没有互联网连接时出现问题!!它引发以下异常
at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onError(ObservableDoOnEach.java:119)
at io.reactivex.internal.observers.DisposableLambdaObserver.onError(DisposableLambdaObserver.java:64)
at io.reactivex.internal.operators.observable.ObservableObserveOn$ObserveOnObserver.checkTerminated(ObservableObserveOn.java:276)
at io.reactivex.internal.operators.observable.ObservableObserveOn$ObserveOnObserver.drainNormal(ObservableObserveOn.java:172)
at io.reactivex.internal.operators.observable.ObservableObserveOn$ObserveOnObserver.run(ObservableObserveOn.java:252)
这是前面提到的函数的用法
override fun sendLoginRequest(email: String, password: String, fcm_token: String) {
subscribeToInternet(dataManager.sendLoginRequest(email, password, fcm_token), this)
}
override fun onComplete() {
}
override fun onSubscribe(d: Disposable) {
DisposableManager.add(d)
}
override fun onNext(t: BaseResponse<LoginData>) {
if(t.status) {
Log.d(TAG, "${t.data}")
dataManager.createLoginSession(t.data!!)
view.loginSuccess()
} else {
Log.d(TAG, t.message)
view.showError(t.message)
}
}
override fun onError(e: Throwable) {
view.showToastError()
Log.d(TAG, e.message)
}
这个问题与您订阅可观察的方式有关。根据 文档 使用subscribe()
而不传递处理错误的操作时,当源抛出异常时,您应该会收到OnErrorNotImplementedException
- 这是因为使用了来自 RxJavaPlugins 的默认异常处理程序。
若要解决此问题,请使用带有onError
参数的重载subscribe
方法之一。例如,公开最终一次性订阅(消费者在下一个, 消费者错误(