Java反应器' subscribe '有时阻塞,有时不阻塞



我已经玩了一段时间的反应器,但我仍然需要得到一些东西。

这段代码

Flux.range(1, 1000)
.delayElements(Duration.ofNanos(1))
.map(integer -> integer + 1)
.subscribe(System.out::println);
System.out.println("after");

的回报:

after
2
3
4

,预期作为订阅状态的文档:this will immediately return control to the calling thread.

那么,为什么这段代码:

Flux.range(1, 1000)
.map(integer -> integer + 1)
.subscribe(System.out::println);

返回
1
2
...
1000
1001
after

我永远无法弄清楚subscribe何时会阻塞或不阻塞,这在编写批处理时非常烦人。

如果有人知道答案,那就太棒了

代码段中没有阻塞代码。

在第一个例子中,你使用.delayElements(),它切换到另一个线程执行,并释放主线程。所以你可以看到你的System.out.println("after");在主线程中立即执行,而反应链正在parallel-n线程上执行。

你的第一个例子:

18:49:29.195 [main] INFO com.example.demo.FluxTest - AFTER
18:49:29.199 [parallel-1] INFO com.example.demo.FluxTest - v: 2
18:49:29.201 [parallel-2] INFO com.example.demo.FluxTest - v: 3
18:49:29.202 [parallel-3] INFO com.example.demo.FluxTest - v: 4
18:49:29.203 [parallel-4] INFO com.example.demo.FluxTest - v: 5
18:49:29.205 [parallel-5] INFO com.example.demo.FluxTest - v: 6

但是你的第二个例子没有切换执行线程,所以你的反应链在主线程上执行。完成后,它继续执行System.out.println("after");

18:51:28.490 [main] INFO com.example.demo.FluxTest - v: 995
18:51:28.490 [main] INFO com.example.demo.FluxTest - v: 996
18:51:28.490 [main] INFO com.example.demo.FluxTest - v: 997
18:51:28.490 [main] INFO com.example.demo.FluxTest - v: 998
18:51:28.490 [main] INFO com.example.demo.FluxTest - v: 999
18:51:28.490 [main] INFO com.example.demo.FluxTest - v: 1000
18:51:28.490 [main] INFO com.example.demo.FluxTest - v: 1001
18:51:28.491 [main] INFO com.example.demo.FluxTest - AFTER

编辑:如果您想在第二个代码片段中切换线程,基本上有两个选项:

  1. 在反应链的任何位置添加subscribeOn(<Scheduler>)。然后,整个订阅过程将在您提供的调度程序的线程上进行。

  2. 例如,在Flux.range()之后添加publishOn(<Scheduler>),那么发射本身将在调用线程上发生,但下游将在您提供的调度程序中的线程上执行

最新更新