弹簧反应器多个订阅



我正在尝试将我的单声道拆分为其他分离的单声道,这些单声道将在不同的线程上处理相同的数据输入数据。

public Mono<String> process() { 
Mono<String> someString = ... // fetching data from API 

someString
.publishOn(Schedulers.fromExecutorService(Executors.newFixedThreadPool(2)))
.map(String::toLowerCase)
.subscribe(this::saveLowercase);
someString
.publishOn(Schedulers.fromExecutorService(Executors.newFixedThreadPool(2)))
.map(String::toUpperCase)
.subscribe(this::saveUpperCase);
return someString;
}

我在日志中看到我获取了 3 次数据,因为每个订阅调用都从 API 获取数据。我想使用 .cache(( 方法,但我想知道一些新的更好的方法来只调用一次 API 并处理此数据?如果 Reactor 不能做到这一点,我可以将 Reactor 换成 RxJava。

在您给出的示例中,您创建了一个冷Mono。所以HTTP电话会针对每个subscriber进行,正如你所说。如果希望HTTP调用仅发生一次,请创建热可观察对象。您必须使用.cache()以便任何未来的订阅者都能获得响应。

这是执行此操作的正确方法。你为什么要寻找"更好的东西"?

官方文档中的示例:

DirectProcessor<String> hotSource = DirectProcessor.create();
Flux<String> hotFlux = hotSource.map(String::toUpperCase).cache();

hotFlux.subscribe(d -> System.out.println("Subscriber 1 to Hot Source: "+d));
hotSource.onNext("blue");
hotSource.onNext("green");
hotFlux.subscribe(d -> System.out.println("Subscriber 2 to Hot Source: "+d));
hotSource.onNext("orange");
hotSource.onNext("purple");
hotSource.onComplete(); 

两个订阅者现在都将获得所有颜色。

相关内容

  • 没有找到相关文章

最新更新