如何使用响应式编程实现嵌套异步代码?



我对响应式编程很陌生。尽管我对函数式编程和 kotlin 协程非常熟悉,但我仍然无法弄清楚如何使用响应式编程范式来重构普通嵌套的 CRUD 代码,尤其是那些具有嵌套异步操作的代码。

例如,下面是一个基于 Java 8 的简单异步 CRUD 代码片段CompletableFuture


getFooAsync(id)
.thenAccept(foo -> {
if (foo == null) {
insertFooAsync(id, new Foo());
} else {
getBarAsync(foo.bar)
.thenAccept(bar -> {
updateBarAsync(foo, bar);
});
}
});

使用 kotlin 协程重构它非常容易,这使得它的可读性更高,而不会失去异步性。

val foo = suspendGetFoo(id)
if(foo==null) {
suspendInsertFoo(id, Foo())
} else {
val bar = suspendGetBar(foo.bar)
suspendUpdateBar(foo, bar);-
}

但是,这样的代码是否适合响应式编程?

如果是这样,给定一个Flux<String> idFlux,如何使用反应堆 3 重构它?

Mono替换每个CompletableFuture是个好主意吗?

这样的代码适合响应式编程吗?

恕我直言,Kotlin 协程更适合此用例,并且会产生更干净的代码。

但是,您可以在反应式流中执行此操作。

用Mono替换每个CompletableFuture是个好主意吗?

我发现反应式流可以很好地处理许多异步用例(例如,来自项目反应堆的示例)。 但是,肯定有一些用例不太合适。 因此,我不能推荐用反应式流替换每个CompletableFuture的策略。

但是,您必须放弃CompletableFuture的一种情况是需要背压时。

关于使用哪种异步模式的很多决定取决于您正在使用的语言/框架/工具/库以及您和您的团队成员对它们的舒适度。 如果您使用的是具有良好 Kotlin 支持的库,并且您的团队熟悉 Kotlin,请使用协程。 反应式流也是如此。

给定一个Flux<String> idFlux,如何使用反应堆 3 重构它?

在考虑此用例的反应式流时,请记住以下几点:

  1. 反应式流无法发出null。 相反,通常使用空Mono。 (从技术上讲,您也可以使用Mono<Optional<...>>,但此时您只是在伤害大脑并乞求错误)
  2. Mono为空时,lambda 传递给处理onNext信号的任何运算符(例如.map.flatMap.handle等)不被调用。 请记住,您正在处理数据流(而不是命令性控制流)
  3. .switchIfEmpty.defaultIfEmpty操作员可以在空Mono上运行。 但是,它们不提供else条件。 下游操作员不知道流以前是空的(除非从传递给.switchIfEmpty的发布器发出的元素很容易以某种方式识别)
  4. 如果流包含多个运算符,并且多个运算符可能导致流变为空
  5. ,则下游运算符很难/不可能确定流变为空的原因
  6. 允许处理上游运算符发出的值的主要异步运算符是.flatMap.flatMapSequential.concatMap。 您将需要使用它们来链接对先前异步操作的输出进行操作的异步操作。
  7. 由于您的用例不返回值,因此反应式流实现将返回Mono<Void>

说了这么多,这里尝试将您的示例转换为反应堆 3(有一些警告):

Mono<Void> updateFoos(Flux<String> idFlux) {
return idFlux                                         // Flux<String>
.flatMap(id -> getFoo(id)                         // Mono<Foo>
/*
* If a Foo with the given id is not found,
* create a new one, and continue the stream with it.
*/
.switchIfEmpty(insertFoo(id, new Foo()))      // Mono<Foo>
/*
* Note that this is not an "else" condition
* to the above .switchIfEmpty
*
* The lambda passed to .flatMap will be
* executed with either:
* A) The foo found from getFoo
*    OR
* B) the newly inserted Foo from insertFoo
*/
.flatMap(foo -> getBar(foo.bar)               // Mono<Bar>
.flatMap(bar -> updateBar(foo, bar))      // Mono<Bar>
.then()                                   // Mono<Void>
)                                             // Mono<Void>
)                                                 // Flux<Void>
.then();                                          // Mono<Void>
}
/*
* @return the Foo with the given id, or empty if not found
*/
abstract Mono<Foo> getFoo(String id);
/*
* @return the Bar with the given id, or empty if not found
*/
abstract Mono<Bar> getBar(String id);
/*
* @return the Foo inserted, never empty
*/
abstract Mono<Foo> insertFoo(String id, Foo foo);
/*
* @return the Bar updated, never empty
*/
abstract Mono<Bar> updateBar(Foo foo, Bar bar);

下面是一个更复杂的示例,它使用Tuple2<Foo,Boolean>来指示是否找到了原始 Foo(这在语义上应该等同于您的示例):

Mono<Void> updateFoos(Flux<String> idFlux) {
return idFlux                                         // Flux<String>
.flatMap(id -> getFoo(id)                         // Mono<Foo>
/*
* Map to a Tuple2 whose t2 indicates whether the foo was found.
* In this case, it was found.
*/
.map(foo -> Tuples.of(foo, true))             // Mono<Tuple2<Foo,Boolean>>
/*
* If a Foo with the given id is not found,
* create a new one, and continue the stream with 
* a Tuple2 indicating it wasn't originally found
*/
.switchIfEmpty(insertFoo(id, new Foo())       // Mono<Foo>
/*
* Foo was not originally found, so t2=false
*/
.map(foo -> Tuples.of(foo, false)))       // Mono<Tuple2<Foo,Boolean>>
/*
* The lambda passed to .flatMap will be
* executed with either:
* A) t1=foo found from getFoo, t2=true
*    OR
* B) t1=newly inserted Foo from insertFoo, t2=false
*/
.flatMap(tuple2 -> tuple2.getT2()
// foo originally found 
? getBar(tuple2.getT1().bar)              // Mono<Bar>
.flatMap(bar -> updateBar(tuple2.getT1(), bar)) // Mono<Bar>
.then()                               // Mono<Void>
// foo originally not found (new inserted)
: Mono.empty()                            // Mono<Void>
)
)                                                 // Flux<Void>
.then();                                          // Mono<Void>
}

最新更新