io.reactivex.exceptions. unddeliverableexception异常无法传递给消费者,因



使用completable时出现UndeliverableException

public Completable createBucketWithStorageClassAndLocation() {
return Completable.complete()
.doFinally(() -> {
Bucket bucket =
storage.create(
BucketInfo.newBuilder(googleUploadObjectConfiguration.bucketName())
.setStorageClass(storageClass)
.setLocation(googleUploadObjectConfiguration.locationName())
.build());       
}).doOnError(error -> LOG.error(error.getMessage()));
}

从Google存储抛出的异常是正确的,但是试图处理doOnError方法

Caused by: com.google.cloud.storage.StorageException: You already own this bucket. Please select another name.

RXJava异常

io.reactivex.exceptions.UndeliverableException: The exception could not be delivered to the consumer because it has already canceled/disposed the flow or the exception has nowhere to go to begin with. Further reading: https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#error-handling | com.google.cloud.storage.StorageException: You already own this bucket. Please select another name.
at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:367)
at io.reactivex.internal.operators.completable.CompletableDoFinally$DoFinallyObserver.runFinally(CompletableDoFinally.java:99)
at io.reactivex.internal.operators.completable.CompletableDoFinally$DoFinallyObserver.onComplete(CompletableDoFinally.java:79)
at io.micronaut.reactive.rxjava2.RxInstrumentedCompletableObserver.onComplete(RxInstrumentedCompletableObserver.java:64)
at io.reactivex.internal.disposables.EmptyDisposable.complete(EmptyDisposable.java:68)
at io.reactivex.internal.operators.completable.CompletableEmpty.subscribeActual(CompletableEmpty.java:27)
at io.reactivex.Completable.subscribe(Completable.java:2309)
at io.micronaut.reactive.rxjava2.RxInstrumentedCompletable.subscribeActual(RxInstrumentedCompletable.java:51)
at io.reactivex.Completable.subscribe(Completable.java:2309)
at io.reactivex.internal.operators.completable.CompletableDoFinally.subscribeActual(CompletableDoFinally.java:43)
at io.reactivex.Completable.subscribe(Completable.java:2309)
at io.micronaut.reactive.rxjava2.RxInstrumentedCompletable.subscribeActual(RxInstrumentedCompletable.java:51)
at io.reactivex.Completable.subscribe(Completable.java:2309)
at io.reactivex.internal.operators.completable.CompletablePeek.subscribeActual(CompletablePeek.java:51)
at io.reactivex.Completable.subscribe(Completable.java:2309)
at io.micronaut.reactive.rxjava2.RxInstrumentedCompletable.subscribeActual(RxInstrumentedCompletable.java:51)
at io.reactivex.Completable.subscribe(Completable.java:2309)
at io.reactivex.Completable.subscribe(Completable.java:2410)
at fete.bird.StartUp.onApplicationEvent(StartUp.java:24)
at fete.bird.StartUp.onApplicationEvent(StartUp.java:12)
at io.micronaut.context.DefaultBeanContext.notifyEventListeners(DefaultBeanContext.java:1323)
at io.micronaut.context.DefaultBeanContext.publishEvent(DefaultBeanContext.java:1308)
at io.micronaut.http.server.netty.NettyHttpServer.fireStartupEvents(NettyHttpServer.java:507)
at io.micronaut.http.server.netty.NettyHttpServer.start(NettyHttpServer.java:350)
at io.micronaut.http.server.netty.NettyHttpServer.start(NettyHttpServer.java:113)
at io.micronaut.runtime.Micronaut.lambda$start$2(Micronaut.java:77)
at java.base/java.util.Optional.ifPresent(Optional.java:176)
at io.micronaut.runtime.Micronaut.start(Micronaut.java:75)
at io.micronaut.runtime.Micronaut.run(Micronaut.java:311)
at io.micronaut.runtime.Micronaut.run(Micronaut.java:297)
at fete.bird.FeteBirdServiceApplication.main(FeteBirdServiceApplication.java:16)

从rxjava文档https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#error-handling我需要处理应用程序中的错误。

我需要写下面的代码,

/ If Java 8 lambdas are supported
RxJavaPlugins.setErrorHandler(e -> { });

我的问题是我应该在哪里写这个代码。我有一个使用java的Micronaut应用程序,或者这是处理异常的唯一方法。

你应该把它添加到你的应用程序类

RxJavaPlugins.setErrorHandler(e -> { });

RxJavaPlugins.setErrorHandler { e ->
if (e is UndeliverableException) {
// Merely log undeliverable exceptions
log.error(e.message)
} else {
// Forward all others to current thread's uncaught exception handler
Thread.currentThread().also { thread ->
thread.uncaughtExceptionHandler.uncaughtException(thread, e)
}
}

更多信息在这里https://github.com/instacart/truetime-android/issues/98

使用Completable.fromAction,也许可以尝试捕获异常,而不是doFinally装置:

Completable.fromAction(() -> {
try {
Bucket bucket = storage.create(
BucketInfo.newBuilder(googleUploadObjectConfiguration.bucketName())
.setStorageClass(storageClass)
.setLocation(googleUploadObjectConfiguration.locationName())
.build()); 
} catch (Throwable error) {
LOG.error(error.getMessage());
}
})

就像@jonathan说的,为了解决这个问题,我只需在我的App类中添加以下代码:

RxJavaPlugins.setErrorHandler { error ->
var e = error
if (e is UndeliverableException) {
e = e.cause ?:let { e }
}
if (e is IOException || e is SocketException) {
// fine, irrelevant network problem or API that throws on cancellation
return@setErrorHandler
}
if (e is InterruptedException) {
// fine, some blocking code was interrupted by a dispose call
return@setErrorHandler
}
if (e is NullPointerException || e is IllegalArgumentException) {
// that's likely a bug in the application
Thread.currentThread().uncaughtExceptionHandler?.uncaughtException(Thread.currentThread(), e)?:let {
FirebaseCrashlytics.log(Log.ERROR, TAG, "RxJavaPlugins uncaughtExceptionHandler is null but error is NullPointerException || error is IllegalArgumentException : $e")
}
return@setErrorHandler
}
if (e is IllegalStateException) {
// that's a bug in RxJava or in a custom operator
Thread.currentThread().uncaughtExceptionHandler?.uncaughtException(Thread.currentThread(), e)?:let {
FirebaseCrashlytics.log(Log.ERROR, TAG, "RxJavaPlugins uncaughtExceptionHandler is null but error is IllegalStateException : $e")
}
return@setErrorHandler
}
FirebaseCrashlytics.log(Log.ERROR, TAG, "Undeliverable exception received, not sure what to do $e")
}

如果你不使用FirebaseCrashlytics,你可以把这段代码改编成你自己的日志系统。此代码是从https://github.com/ReactiveX/RxJava/wiki/What%27s-different-in-2.0#error-handling中的源代码编辑而来的(URL在错误本身中指出)。

相关内容

  • 没有找到相关文章

最新更新