我有一个String
s的原语Flux
,并在main()
方法中运行此代码。
package com.example;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import reactor.util.Logger;
import reactor.util.Loggers;
import java.util.Arrays;
import java.util.List;
public class Parallel {
private static final Logger log = Loggers.getLogger(Parallel.class.getName());
private static List<String> COLORS = Arrays.asList("red", "white", "blue");
public static void main(String[] args) throws InterruptedException {
Flux<String> flux = Flux.fromIterable(COLORS);
flux
.log()
.map(String::toUpperCase)
.subscribeOn(Schedulers.newParallel("sub"))
.publishOn(Schedulers.newParallel("pub", 1))
.subscribe(value -> {
log.info("==============Consumed: " + value);
});
}
}
如果你尝试运行这段代码,应用程序永远不会停止运行,你需要手动停止它。如果我用.parallel()
代替.newParallel()
,一切都按预期工作,应用程序正常完成。
为什么它不能自己完成运行?为什么会挂起来?这种行为的原因是什么?
如果你运行这段代码作为一个JUnit测试,它工作良好,它不会挂起。
您自己用newXxx
工厂方法创建的Scheduler
实例在默认情况下以非守护进程模式创建,这意味着它可以防止JVM退出。
JUnit在所有测试运行后调用System.exit()
,这解释了为什么测试场景没有挂起。
在这种情况下,Schedulers.newSingle()
和Schedulers.newParallel()
变体是最糟糕的"冒犯者",因为创建的线程在非活动超时后不会被剔除,这与Schedulers.newBoundedElastic()
不同。
如果在现实世界的场景中,您有一个定义良好的应用程序生命周期,您可以将Scheduler
实例存储在某个地方(例如:作为bean),并确保在应用程序生命周期结束时调用每个Scheduler#dispose()
。
更简单的解决方案:使用相关的工厂过载显式地创建Schedulers
和daemon == true
。