如何通过 Flux 使用不同的调度程序运行两个任务



我尝试用两种策略(串行和并行(存储和解析和存储一些原始数据

Flux<PanasonicData> f = Flux.create(sink -> dataRepo.addConsumer(sink::next));
Flux.from(f).publishOn(Schedulers.single()).subscribe(this::save1);
Flux.from(f).publishOn(Schedulers.parallel()).map(MyClass::parse).subscribe(this::save2);

ConnectableFlux<PanasonicData> cf = Flux.create(sink -> dataRepo.addConsumer(sink::next)).publish();
cf.autoConnect().publishOn(Schedulers.single()).subscribe(this::save1);
cf.autoConnect().publishOn(Schedulers.parallel()).map(MyClass::parse).subscribe(this::save2);

但第二个任务从未运行!! 如何使用这两种不同的策略运行这两个任务?

您可以通过autoConnect(int minSubscribers)指定最小订阅者数量:

Flux<PanasonicData> cf = Flux.create(sink -> dataRepo.addConsumer(sink::next)).publish().autoConnect(2);
cf.publishOn(Schedulers.single()).subscribe(this::save1);
cf.publishOn(Schedulers.parallel()).map(MyClass::parse).subscribe(this::save2);

相关内容

  • 没有找到相关文章

最新更新