我已经玩了一段时间的反应器,但我仍然需要得到一些东西。
这段代码
Flux.range(1, 1000)
.delayElements(Duration.ofNanos(1))
.map(integer -> integer + 1)
.subscribe(System.out::println);
System.out.println("after");
的回报:
after
2
3
4
,预期作为订阅状态的文档:this will immediately return control to the calling thread.
那么,为什么这段代码:
Flux.range(1, 1000)
.map(integer -> integer + 1)
.subscribe(System.out::println);
返回1
2
...
1000
1001
after
我永远无法弄清楚subscribe
何时会阻塞或不阻塞,这在编写批处理时非常烦人。
如果有人知道答案,那就太棒了
代码段中没有阻塞代码。
在第一个例子中,你使用.delayElements()
,它切换到另一个线程执行,并释放主线程。所以你可以看到你的System.out.println("after");
在主线程中立即执行,而反应链正在parallel-n
线程上执行。
你的第一个例子:
18:49:29.195 [main] INFO com.example.demo.FluxTest - AFTER
18:49:29.199 [parallel-1] INFO com.example.demo.FluxTest - v: 2
18:49:29.201 [parallel-2] INFO com.example.demo.FluxTest - v: 3
18:49:29.202 [parallel-3] INFO com.example.demo.FluxTest - v: 4
18:49:29.203 [parallel-4] INFO com.example.demo.FluxTest - v: 5
18:49:29.205 [parallel-5] INFO com.example.demo.FluxTest - v: 6
但是你的第二个例子没有切换执行线程,所以你的反应链在主线程上执行。完成后,它继续执行System.out.println("after");
18:51:28.490 [main] INFO com.example.demo.FluxTest - v: 995
18:51:28.490 [main] INFO com.example.demo.FluxTest - v: 996
18:51:28.490 [main] INFO com.example.demo.FluxTest - v: 997
18:51:28.490 [main] INFO com.example.demo.FluxTest - v: 998
18:51:28.490 [main] INFO com.example.demo.FluxTest - v: 999
18:51:28.490 [main] INFO com.example.demo.FluxTest - v: 1000
18:51:28.490 [main] INFO com.example.demo.FluxTest - v: 1001
18:51:28.491 [main] INFO com.example.demo.FluxTest - AFTER
编辑:如果您想在第二个代码片段中切换线程,基本上有两个选项:
在反应链的任何位置添加
subscribeOn(<Scheduler>)
。然后,整个订阅过程将在您提供的调度程序的线程上进行。例如,在
Flux.range()
之后添加publishOn(<Scheduler>)
,那么发射本身将在调用线程上发生,但下游将在您提供的调度程序中的线程上执行