在多个 Fluxes 结束时同步,例如 subscribe() 和 then()



>我有不同的Flux可以做一些事情,例如将值存储在数据库中或简单地打印出来。所有这些都是有限的,我想在所有Flux完全消耗后执行一个操作:

public void run(String... args) {
Flux<String> firstFlux = Flux.just("a", "b", "c");
Flux<Integer> secondFlux = Flux.just(1, 2, 3);
Flux<Object> thirdFlux = Flux.just(1, "2", 3);
firstFlux
.doOnComplete(() -> log.info("first Flux has completed."))
.subscribe(s -> insertIntoDbString(s));
secondFlux
.doOnComplete(() -> log.info("second Flux has completed."))
.subscribe(i -> insertIntoDbInteger(i));
thirdFlux
.doOnComplete(() -> log.info("third Flux has completed."))
.subscribe(o -> insertIntoDbObject(o));
// do something after all Fluxes have completed:
// log.info("all Fluxes have completed.");
}
private void insertIntoDbObject(Object s) {
log.info("inserting Object: {}", s);
}
private void insertIntoDbInteger(Integer s) {
log.info("inserting Integer: {}", s);
}
private void insertIntoDbString(String s) {
log.info("inserting String: {}", s);
}

我怎样才能做到这一点?我想,我可以为每个Flux使用then()来接收Mono<Void>,收集这些并使用那里的doOnComplete(),但那时我似乎无法再订阅Flux了。

谢谢。

如果您不关心传递每个通量发出的值,只需完成每个通量,那么最合适的运算符是Mono.when。它清楚地表明,您不需要发射的数据,只需要通量完成的信息。

Mono.when(
firstFlux
.doOnComplete(() -> log.info("first Flux has completed."))
.doOnNext(s -> insertIntoDbString(s)),
secondFlux
.doOnComplete(() -> log.info("second Flux has completed."))
.doOnNext(i -> insertIntoDbInteger(i)),
thirdFlux
.doOnComplete(() -> log.info("third Flux has completed."))
.doOnNext(o -> insertIntoDbObject(o))
).doOnSuccess(aVoid -> log.info("all Fluxes have completed."));

您可以使用Flux.merge合并所有通量,并通过如下doOnNext替换订阅方法:

Flux.merge(
firstFlux
.doOnComplete(() -> log.info("first Flux has completed."))
.doOnNext(s -> insertIntoDbString(s)),
secondFlux
.doOnComplete(() -> log.info("second Flux has completed."))
.doOnNext(s -> insertIntoDbInteger(s)),
thirdFlux
.doOnComplete(() -> log.info("third Flux has completed."))
.doOnNext(s -> insertIntoDbObject(s))
)
.doOnComplete(() -> log.info("All flux has completed."))

相关内容

最新更新