为什么来自项目 Reactor 的 Schedulers.newParallel() 在 Flux 完成发射元素后没有停止运行?



我有一个Strings的原语Flux,并在main()方法中运行此代码。

package com.example;

import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import reactor.util.Logger;
import reactor.util.Loggers;

import java.util.Arrays;
import java.util.List;

public class Parallel {

private static final Logger log = Loggers.getLogger(Parallel.class.getName());

private static List<String> COLORS = Arrays.asList("red", "white", "blue");

public static void main(String[] args) throws InterruptedException {

Flux<String> flux = Flux.fromIterable(COLORS);

flux
.log()
.map(String::toUpperCase)
.subscribeOn(Schedulers.newParallel("sub"))
.publishOn(Schedulers.newParallel("pub", 1))
.subscribe(value -> {
log.info("==============Consumed: " + value);
});
}
}

如果你尝试运行这段代码,应用程序永远不会停止运行,你需要手动停止它。如果我用.parallel()代替.newParallel(),一切都按预期工作,应用程序正常完成。

为什么它不能自己完成运行?为什么会挂起来?这种行为的原因是什么?

如果你运行这段代码作为一个JUnit测试,它工作良好,它不会挂起。

您自己用newXxx工厂方法创建的Scheduler实例在默认情况下以非守护进程模式创建,这意味着它可以防止JVM退出。

JUnit在所有测试运行后调用System.exit(),这解释了为什么测试场景没有挂起。

在这种情况下,Schedulers.newSingle()Schedulers.newParallel()变体是最糟糕的"冒犯者",因为创建的线程在非活动超时后不会被剔除,这与Schedulers.newBoundedElastic()不同。

如果在现实世界的场景中,您有一个定义良好的应用程序生命周期,您可以将Scheduler实例存储在某个地方(例如:作为bean),并确保在应用程序生命周期结束时调用每个Scheduler#dispose()

更简单的解决方案:使用相关的工厂过载显式地创建Schedulersdaemon == true

最新更新