项目反应器:如何blockLast()工作?



来自blockLast()的文档:

Subscribe to this Flux and block indefinitely until the upstream signals its last value or completes. Returns that value, or null if the Flux completes empty. In case the Flux errors, the original exception is thrown (wrapped in a RuntimeException if it was a checked exception).

我们举个例子:

Flux
.range(0, 1000)
.doOnNext(i -> System.out.println("i = " + i + "Thread: " + Thread.currentThread().getName()))
.flatMap(i -> {
System.out.println("end"+ i + " Thread: " + Thread.currentThread().getName());
return Mono.just(i);
}).blockLast();

如果我要根据文档自己的描述来理解这一点,我认为blockLast意味着阻止发布者(在这种情况下,直到所有1000个整数都成功发出,包括最后一个)。

之后调用。flatmap(…),一次一个(因为我们没有特别强制并行处理)。

但是,当运行时,我在控制台中看到以下内容:

i = 0Thread: main
end0 Thread: main
i = 1Thread: main
end1 Thread: main
i = 2Thread: main
end2 Thread: main
i = 3Thread: main
end3 Thread: main
i = 4Thread: main
end4 Thread: main
i = 5Thread: main

难道i = 0Thread: main应该先运行到i = 1000Thread: main然后.flatMap被执行?

i = 0Thread: main
i = 1Thread: main
i = 2Thread: main
i = 3Thread: main
i = 4Thread: main
.
.
end1 Thread: main
end2 Thread: main
end3 Thread: main

如果使用.subscribe(),则行为完全相同。我有点糊涂了。

观察到的行为是好的。Flux描述了在发出元素时执行的一系列操作。

因此,在您的示例中,range生成的每个整数立即由链中的下一个操作处理,即这里的flatMap。

与标准java.util.stream.Stream API的行为相同。

这种行为的原因是双重的:

  • 避免在每个处理步骤之间缓冲所有元素
  • 数据源可以发出无限数量的消息。它还可以发射各种频率的消息(恒定延迟或不延迟,非常快或非常慢等)。因此,流API被设计为处理并返回每个元素,只要它被接收,独立于它之前或之后的消息。

关于blockLast:在内部,它订阅flux,并等待完成或错误信号返回或抛出错误给用户。

最新更新