在响应式编程中,doOnNext调用的顺序是否得到保证?



响应式编程新手。我的Flux中有一系列调用,我需要确保它们按顺序完成。如

Flux<Thing> flux = ...
.doOnNext(this::sendThing)
.doOnNext(this::persistThing)
.doOnError(error -> log.error("", error))
.blockLast();

我需要确保sendThingpersistThing之前完成。我不清楚,作为一个被动的新手,这是否得到保证。

根据文档:

Consumer被执行优先,则传播onNext信号下游。

因此,this::sendThing消费将在onNext信号之前完成传播到this::persistThing.

的例子:

Flux.range(1, 3)
.doOnNext(n -> log.info("doOnNext1"))
.log()
.doOnNext(n -> log.info("doOnNext2"))
.blockLast();
上面的代码片段输出如下内容:

  • doOnNext1
  • | onNext (1)
  • doOnNext2
  • doOnNext1
  • | onNext (2)
  • doOnNext2
  • doOnNext1
  • | onNext (3)
  • doOnNext2

onNext信号总是在"doOnNext2"打印之前传播。

请记住没有方法只能用于有副作用的手术(例如:日志)。

不熟悉这种响应式实现,但看起来像是在向单个可观察对象添加两个独立的订阅者,这并不能保证执行顺序。

如果send和persist是阻塞(同步)函数,你可以创建一些sendAndSync函数。

否则,你需要让'send'变成observable本身,而'persist'应该是它的订阅者。您可以通过在发送完成后向PublishSubject写入'send'来实现这一点,并且'persist'是该PublishSubject的订阅者(通过doOnNext)。

最新更新