弹簧反应器Webflux调度器并行性



对于完全无阻塞的端到端反应式调用,是否建议显式调用publishOn或subscribeOn来切换调度程序?对于消耗 CPU 或非消耗 CPU 的任务,是否有利于始终使用并行通量来优化性能?

对于完全无阻塞的端到端反应式调用,是否建议显式调用 publishOn 或 subscribeOn 来切换调度程序?

publishOn在将数据发布到下游时使用,而subscribeOn在上游使用数据时使用。所以这真的取决于你想做什么样的工作。

对于消耗 CPU 或非消耗 CPU 的任务,是否有利于始终使用并行通量来优化性能?

绝对不是,请考虑以下示例:

Flux.range(1, 10)
.parallel(4)
.runOn(Schedulers.parallel())
.sequential()
.elapsed()
.subscribe(i -> System.out.printf(" %s ", i));

上面的代码完全是浪费,因为i几乎可以立即处理。以下代码的性能将比上述更好:

Flux.range(1, 10)
.elapsed()
.subscribe(i -> System.out.printf(" %s ", i));

现在考虑一下:

public static <T> T someMethodThatBlocks(T i, int ms) {
try { Thread.sleep( ms ); }
catch (InterruptedException e) {}
return i;
}
// some method here
Flux.range(1, 10)
.parallel(4)
.runOn(Schedulers.parallel())
.map(i -> someMethodThatBlocks(i, 200))
.sequential()
.elapsed()
.subscribe(i -> System.out.printf(" %s ", i));

输出类似于:

[210,3]  [5,1]  [0,2]  [0,4]  [196,6]  [0,8]  [0,5]  [4,7]  [196,10]  [0,9] 

如您所见,第一个响应是在210毫秒后出现的,然后是 3 个响应,中间经过了近0个时间。这个循环一遍又一遍地重复。这是您应该使用并行通量的地方。请注意,创建更多数量的线程并不能保证性能,因为当线程数量更多时,上下文切换会增加大量开销,因此应在部署之前对代码进行测试。如果有很多阻塞调用,每个 CPU 的线程数超过 1 个可能会提高性能,但如果进行的调用是 CPU 密集型的,则每个 CPU 有多个线程会由于上下文切换而降低性能。

总而言之,这始终取决于您想要实现的目标。

值得一提的是,我假设这里的上下文是Webflux而不是一般的反应堆(因为这个问题被标记为这样。当然,如果我们谈论的是通用的反应堆用例而不考虑Webflux,那么建议可能会有很大差异。

对于完全无阻塞的端到端反应式调用,是否建议显式调用 publishOn 或 subscribeOn 来切换调度程序?

一般建议不要显式调用这些方法,除非您有理由这样做。(在正确的上下文中使用它们并没有错,但"仅仅因为"这样做可能不会带来任何好处。

对于消耗 CPU 或非消耗 CPU 的任务,是否有利于始终使用并行通量来优化性能?

这取决于您要实现的目标,以及"CPU 消耗"(或 CPU 密集型(任务的含义。请注意,这里我说的是真正的CPU 密集型任务,而不是阻塞代码 - 在这种情况下,理想情况下,我会将 CPU 密集型部分转移到另一个微服务,使您能够根据需要将其与 Webflux 服务分开进行扩展。

使用并行通量(并在并行调度程序上运行它(应该使用所有可用的内核来处理数据 - 这可能会导致数据处理速度更快。但请记住,默认情况下,您还为每个内核运行一个事件循环,因此您基本上从事件循环中"窃取"了一些可用容量以实现这一点。这是否理想取决于您的用例,但通常不会带来太多好处。

相反,我推荐两种方法:

  • 如果可以将 CPU 密集型任务分解为小的、低强度的块,请这样做 - 然后您可以将其保留在事件循环中。这允许事件循环保持及时运行,同时按可能安排这些 CPU 密集型任务。
  • 如果无法分解它,请启动一个单独的调度程序(可以选择使用低优先级,以便不太可能从事件循环中窃取资源(,然后将所有 CPU 密集型任务都用于此。这样做的缺点是会创建更多的线程,但再次保持事件循环自由。默认情况下,您的线程数与事件循环的内核数一样多 - 您可能希望减少该线程数,以便为"CPU 密集型"调度程序提供更多内核。

最新更新