如果用户在处理器中引发错误,FluxSink.next()将引发错误/异常


Flux.<Integer>push(sink -> {
try {
for (int i = 0; i < 10 && !sink.isCancelled(); i++) {
sink.next(i);
}
sink.complete();
} catch (Exception e) {
System.out.println("Error in wrong place " + e);
}
})
.doOnNext(i -> { if (i % 5 == 0) throw new RuntimeException("Error"); })
.subscribe(i -> System.out.println("i = " + i));

该代码的输出为

`Error in wrong place reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.RuntimeException: Error` 

这意味着异常是由推送方法中的sink.next(i);重新抛出的,而不是推送到错误反应通道。为什么?

它被推送到流量错误通道,但您在该通道上没有错误订阅者,您可以添加:

.doOnError(e -> System.out.println("Now error in good place = " + e))

.subscribe(i -> System.out.println("i = " + i), e -> System.out.println("Now error in good place = " + e));

完整的代码是:

Flux.<Integer>push(sink -> {
try {
for (int i = 0; i < 10 && !sink.isCancelled(); i++) {
sink.next(i);
}
sink.complete();
} catch (Exception e) {
System.out.println("Error in wrong place " + e);
}
})
.doOnNext(i -> { if (i % 5 == 0) throw new RuntimeException("Error"); })
.doOnError(e -> System.out.println("Now error in good place = " + e)) //it's missing
.subscribe(i -> System.out.println("i = " + i));

另请参阅这个答案,为什么您需要订阅者和标准(foxnext(((subscriber不会抛出错误:当使用projectreactor 中的Flux时,生成和处理异常的正确方法是什么

相关内容

最新更新