reactor何时执行订阅链



反应堆文件说明如下:

在订阅之前不会发生任何事情

如果这是真的,为什么我在运行下面的代码片段时会看到java.lang.NullPointerException,它有一个没有订阅的反应器链?

@Test
void test() {
String a = null;
Flux.just(a.toLowerCase())
.doOnNext(System.out::println);
}

Deepak,

什么都不发生意味着在订阅发生之前,数据不会通过函数链流向消费者。

您得到NPE是因为Java试图计算在Flux定义步骤中给hot运算符just()的值。

您也可以使用defer()just()转换为冷运算符,这样您只有在订阅后才能收到NPE

public Flux<String> test() {
String a = null;
return Flux.defer(() -> Flux.just(a.toLowerCase()))
.doOnNext(System.out::println);
}

请阅读更多关于热操作和保持操作的信息。

更新:

冷热出版商的小例子。每次新订阅发生时,冷发布者的主体都会重新计算。同时,just()仅产生在定义时仅计算一次的时间。

Mono<Date> currentTime = Mono.just(Calendar.getInstance().getTime());
Mono<Date> realCurrentTime = Mono.defer(() -> Mono.just(Calendar.getInstance().getTime()));
// 1 sec sleep
Thread.sleep(1000);
currentTime.subscribe(time -> System.out.println("Current Time " + time.getTime()));
realCurrentTime.subscribe(time -> System.out.println("Real current Time " + time.getTime()));
Thread.sleep(2000);
currentTime.subscribe(time -> System.out.println("Current Time " + time.getTime()));
realCurrentTime.subscribe(time -> System.out.println("Real current Time " + time.getTime()));

输出为:

当前时间1583788755759

当前实时1583788756826

当前时间1583788755759

当前实时1583788758833

最新更新