我有这样一个需求。
Flux<Integer> s1 = .....;
s1.flatMap(value -> anotherSource.find(value));
我需要一种方法来阻止这个s1当anotherSource.find
给我第一个空。怎么做呢?
注意:
一个可能的解决方案是抛出错误,然后捕获它以停止。anotherSource.find(value).switchIfempty(Mono.error(..))
我在寻找比这更好的解决方案。
您不会找到一个特定的操作符,您将不得不组合操作符来实现它。(注意,这并不意味着它是"黑客"。从本质上讲,响应式框架通常是为了将基本操作符组合在一起以实现您的用例而使用的。
我同意使用错误来实现远不是理想的,尽管它可能会破坏反应链中真实错误的流-所以这应该是最后的手段。
在我希望流基于内部发布者停止的情况下,我通常采取的方法是实现内部流,过滤掉onComplete()
信号,然后在适当的地方重新添加onComplete()
(在这种情况下,如果它是空的)。然后,您可以使外部流非物质化,它将响应已完成的信号,无论您在何处注入它,停止流:
s1.flatMap(
value ->
anotherSource
.find(value)
.materialize()
.filter(s -> !s.isOnComplete())
.defaultIfEmpty(Signal.complete()))
.dematerialize()
这样做的优点是可以保留任何错误信号,同时也不需要其他对象或特殊值。