publishOn与subscribeOn在Reactor中进行阻塞调用时的比较



在博客文章Flight of the Flux 3中,作者建议用subscribeOn调用包装Mono中的同步阻塞调用,如文章中的片段所示:

final Flux<String> betterFetchUrls(List<String> urls) {
return Flux.fromIterable(urls)
.flatMap(url -> 
//wrap the blocking call in a Mono
Mono.fromCallable(() -> blockingWebClient.get(url))
//ensure that Mono is subscribed in an boundedElastic Worker
.subscribeOn(Schedulers.boundedElastic())
); //each individual URL fetch runs in its own thread!
}

但在同一篇文章的早些时候,他们展示了可以使用publishOn来确保阻塞调用在一个单独的线程上完成:

Flux.fromIterable(firstListOfUrls) //contains A, B and C
.publishOn(Schedulers.boundedElastic())
.map(url -> blockingWebClient.get(url))
.subscribe(body -> System.out.println(Thread.currentThread().getName + " from first list, got " + body));

既然如此,为什么不直接使用publishOn来实现betterFetchUrls方法呢?

final Flux<String> betterFetchUrls(List<String> urls) {
return Flux.fromIterable(urls)
.publishOn(Schedulers.boundedElastic())
.map(url -> blockingWebClient.get(url));
}

这不是更简单吗?附录C中的反应堆参考手册也对MonosubscribeOn进行了包装调用,所以我认为肯定有一个更可取的原因,但我不知道这个原因可能是什么。

感谢您的真知灼见。

您的代码实际上很好,除了必须使用flatMap而不是map作为:

final Flux<String> betterFetchUrls(List<String> urls) {
return Flux.fromIterable(urls)
.publishOn(Schedulers.boundedElastic())
.flatMap(url -> blockingWebClient.get(url));
}

subscribeOnpublishOn都进行上下文切换,因此两者都是非常昂贵的操作(不过我不知道哪一个更昂贵,我希望能提供一些相关文档(。但是,通常接受的进行阻塞调用的方法是使用文档中提到的subscribeOn

这篇博客文章旨在解释这两个运算符。此外,subscribeOn在第二个片段中可能更可取的原因在文本中公开:

但是,如果url获取方法是由其他人编写的,而他们很遗憾地忘记添加publishOn,该怎么办?是否有方法影响线程上游?

是的,当publishOn解决方案可用时,它可能更易于阅读和遵循。也就是说,如果你对外部序列没有控制权,而你只想返回一个Mono,确保它的工作在你选择的线程上完成,那么.subscribeOn将提供更多的保证。

最新更新