Flux.then()在完成信号之前运行



我试图用Flux流对象做一些事情,在处理完所有元素后,我做了一些最后的工作并完成了Mono,但它不起作用:

// data and id comming from a webrequest
// myRepository is a org.springframework.data.r2dbc.repository.R2dbcRepository
myRepository.findById(id)
.flatMap(dbObject -> doSomethingWithDbObjectAndSave(dbObject , data))
.then (doOnFinish(data))
.subscribe();
Mono<DbObject> doSomethingWithDbObjectAndSave (DbObject dbo, DataObject data){
...
}
Mono<Void> doOnFinish(DataObject data){
...
}

问题是:即使我尝试了这个;doOnFinish;在第一个元素传递doSomethingWithDbObjectAndSave"之前调用;但我更改了数据对象上的某些内容,并且希望以前也这样做!

我试图更改代码:

myRepository.findById(id)
.flatMap(dbObject -> doSomethingWithDbObjectAndSave(dbObject , data))
.last()
.flatMap(dbObject  -> doOnFinish(data))
.subscribe(); 

我希望,我可以使用最后一个元素来触发onFinish函数,但我得到了"flux#last((没有观察到任何onnext信号";不要否认这一点!

有人知道吗?

then(methodCall(data))将急切地评估参数表达式,从而在输入then之前调用methodCall。您需要一个方法来惰性地评估其参数。

我认为您正在寻找doOnComplete:

public final Flux<T> doOnComplete(Runnable onComplete)

添加Flux成功完成时触发的行为(副作用(。

myRepository.findById(id)
.flatMap(dbObject -> doSomethingWithDbObjectAndSave(dbObject , data))
.doOnComplete(() -> doOnFinish(data))
.subscribe();

哇。。。几个小时后,我找到了解决方案。doOnComplete不会使用doOnFinish(Mono(的异步调用返回流。现在我发现了这个:

myRepository.findById(id)
.flatMap(dbObject -> doSomethingWithDbObjectAndSave(dbObject , data))
.takeLast(1)
.flatMap(dbObject  -> doOnFinish(data))
.subscribe();

这对我很管用。。。。

我认为flatMap在这里是多余的。相反,可以这样做:

myRepository.findById(id)
.flatMap(dbObject -> doSomethingWithDbObjectAndSave(dbObject , data))
.then (Mono.deffer(()->doOnFinish(data)))
.subscribe();

最新更新