触发错误的项目反应器继续值为空



我在使用项目反应器编写的代码时遇到了一些问题:

<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.2.12.RELEASE</version>
</dependency>

请考虑以下代码:

class Scratch {
public static void main(String[] args) {
ArrayBlockingQueue<Long> q = new ArrayBlockingQueue<>(10);
startProducer(q);
Flux.<Long> create(sink -> consumeItemsFromQueue(q, sink))
.doOnNext(ctx -> System.out.println("Processing " + ctx))
.flatMap(ctx -> Flux.push((sink)->{ throw new IllegalArgumentException("bum!");}))
.onErrorContinue((ex, obj)->{
System.err.println("Caught error "+ex.getMessage() +" in obj:" +obj);
})
.doOnNext(element -> System.out.println("Do On NExt: " + element))
.subscribe();
}
private static void consumeItemsFromQueue(ArrayBlockingQueue<Long> q, FluxSink<Long> sink) {
while (true) {
try {
sink.next(q.take());
} catch (Throwable t) {
System.err.println("Error in catch");
}
}
}
private static void startProducer(ArrayBlockingQueue<Long> q) {
Thread thread = new Thread(() -> {
while (true) {
try {
q.put(System.currentTimeMillis());
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
thread.start();
}
}

此代码生成以下输出:

Processing 1580494319870
Caught error bum! in obj:null
Processing 1580494321871
Caught error bum! in obj:null

根据onErrorContinue中的文档,对象应该是导致错误的值。因此,我希望它是flatMapctx对象。相反,它是空的。

这是一个错误还是我对文档的理解有缺陷?

关于onErrorContinue行为的推理可能相当违反直觉,所以我总是建议尽可能避免使用它。

根据 onErrorContinue 中的文档,对象应该是导致错误的值。因此,我希望它是来自flatMap的ctx对象。相反,它是空的。

啊,但ctx不是导致错误的值,因为您的外部flatMap()调用工作正常 - 它只是中继内部Flux(示例中的Flux.push()行(中发生的错误。由于没有导致此错误的值(它只是抛出了一个异常(,因此不会报告任何值。因此,您通过此示例报告的行为正是我所期望的。

如果您将该行更改为以下内容:

.flatMap(ctx -> Flux.push(sink -> sink.next(ctx)).flatMap(x -> Mono.error(new IllegalArgumentException("bum!"))))

。或:

.flatMap(ctx -> Flux.just(ctx).flatMap(x -> Mono.error(new IllegalArgumentException("bum!"))))

。然后你会看到类似于Caught error bum! in obj:1591657236326的内容,因为异常实际上有一个原因,即由处理该值的运算符引起的错误。

最新更新