我有一个简单的Spring控制器,它有一个Spring服务作为依赖项。在服务类中,我有一个名为flag的int类型的静态volatile字段。当我通过控制器调用createFlux((方法时,标志被设置为5,然后创建一个新的Flux,它每秒检查一次标志,并根据标志值打印一条消息。由于delayElements方法的语义,代码将并行执行。之后,如果我调用changeFlag((方法,它会更改标志的值,并且由于标志变量是可变的,我希望打印的消息会更改,但这并没有发生。
这是代码:
@RestController
public class MyController {
@Autowired private MyService myService;
@GetMapping("createFlux")
public void createFlux() {
myService.createFlux();
}
@GetMapping("changeFlag")
public void changeFlag() {
myService.changeFlag();
}
}
@Service
public class MyService {
private static volatile int flag = 3;
public void changeFlag() {
flag = 3;
System.out.println("############# Flag = " + flag);
}
public void createFlux() {
flag = 5;
System.out.println("Flag = " + flag);
Flux.generate(sink -> {
if (flag == 3) {
sink.next("Stop");
} else {
sink.next("Start");
}
}).delayElements(Duration.ofSeconds(1)).subscribe(s -> System.out.println(Thread.currentThread().getName() + " : " + s));
}
}
这是控制台中的输出:
Flag = 5
parallel-1 : Start
parallel-2 : Start
parallel-3 : Start
parallel-4 : Start
############# Flag = 3
parallel-5 : Start
parallel-6 : Start
parallel-7 : Start
parallel-8 : Start
parallel-1 : Start
parallel-2 : Start
parallel-3 : Start
parallel-4 : Start
parallel-5 : Start
parallel-6 : Start
parallel-7 : Start
parallel-8 : Start
parallel-1 : Start
parallel-2 : Start
parallel-3 : Start
parallel-4 : Start
parallel-5 : Start
parallel-6 : Start
parallel-7 : Start
parallel-8 : Start
parallel-1 : Start
parallel-2 : Start
parallel-3 : Start
parallel-4 : Start
parallel-5 : Start
parallel-6 : Start
parallel-7 : Start
parallel-8 : Start
parallel-1 : Stop
parallel-2 : Stop
parallel-3 : Stop
parallel-4 : Stop
parallel-5 : Stop
parallel-6 : Stop
从输出中可以看出,即使flag的值更改为3,它也会继续打印消息Start。过了一段时间,打印的信息就变了。我想有一些缓存或类似的东西,但volatile变量没有被缓存。
问题是——这是一个bug还是我遗漏了什么?
生成使用者会立即为一堆元素执行。您可以通过在generate
:之后添加日志来轻松确认
Flux.generate(...).doOnNext(e -> log.info("executed: {}", e))
打印:
2022-01-20 13:31:50,346 INFO parallel-1 - Flag = 5
2022-01-20 13:31:50,349 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,351 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,352 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,352 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,352 INFO parallel-1 - executed: Start
2022-01-20 13:31:50,352 INFO parallel-1 - executed: Start
CCD_ 2方法基于来自下游的需求来发射元素。它首先生成32个元素并对它们进行缓冲。当下游开始处理元素时,以及当缓冲区大小降至阈值以下时,它会发出更多的元素。