反应器-异步/非阻塞



我认为下面的通量链将通过事件循环(如JS(放置/执行。因此,运行下面的代码将首先打印阻塞的CCD_ 1循环&则将执行通量链。

但是整个通量总是在它移动到for循环之前首先执行。[我确实有一些正在阻塞的sleep语句。但有两个doOnNext阶段]

AtomicInteger atomicInteger = new AtomicInteger(0);
// reactor
Flux.generate(synchronousSink -> {
if (atomicInteger.incrementAndGet() < 3) {
synchronousSink.next(atomicInteger.get());
} else
synchronousSink.complete();
})
.doOnNext(i -> {
System.out.println(
"A - Received " + i + " by " + Thread.currentThread().getName()
);
sleep(Duration.ofSeconds(1));
}).doOnNext(i -> {
System.out.println(
"B - Received " + i + " : by " + Thread.currentThread().getName()
);
sleep(Duration.ofSeconds(1));
}).subscribe();

for (int i = 0; i < 5; i++) {
System.out.println("For " + i + " by " + Thread.currentThread().getName());
sleep(Duration.ofMillis(500));
}

它打印

A - Received 1 by main
B - Received 1 by main
A - Received 2 by main
B - Received 2 by main
For 0 by main
For 1 by main
For 2 by main
For 3 by main
For 4 by main

有人能解释一下这种行为并回答这些问题吗?

  1. 当我们使用reactor时,通过使用一些调度器来实现异步/非阻塞行为的唯一方法
  2. 如果我不使用任何调度器,而是让代码使用当前线程执行,那么即使对于IO密集型应用程序,使用WebFlux而不是Spring MVC,我们还能期待更好的性能差异吗
  1. 线程阻塞不是Reactor用法的正确用法。要使其以非阻塞的方式工作,您应该使用publishOn/subscribeOn,然后输出应该是:
For 0 by main
A - Received 1 by boundedElastic-3
For 1 by main
For 2 by main
B - Received 1 : by boundedElastic-3
For 3 by main
For 4 by main
A - Received 2 by boundedElastic-3

有关publishOnsubscribeOn的更多信息,请参阅:链接

  1. 当然-Reactor支持HTTP(包括Websockets(、TCP和UDP的非阻塞。更重要的是,默认情况下Reactor在Netty服务器上工作,这改变了处理请求的方式。例如,在Tomcat中,请求响应是由同一个线程处理的,而且这个线程正在等待响应,所以它被阻塞了。在Netty中,一个线程可以处理发送请求,而另一个线程则可以处理接收响应——线程不会隐含地等待响应

最新更新