尝试管理多个Flux/Mono,先启动其中一些,然后组合其中一些,结果有点迷失



我有一个接受实体ID的模块;分辨率类型";作为参数,然后通过返回Fluxes的多个操作异步收集数据(主要(。解析被分解为多个(主要是(异步操作,每个操作都致力于收集有助于解析的不同数据类型。我说";主要是";异步,因为某些解析类型需要一些必须同步进行的初步操作,以便为解析的其余异步Flux操作提供信息。现在,在进行同步操作的同时,至少可以开始整个异步解析操作的一部分。我想在同步操作进行时启动这些Flux操作。然后,一旦解决了同步数据,我就可以获得剩余操作的每个Flux。一些分辨率类型将使所有Flux操作返回数据,而另一些分辨率类型收集的信息较少,并且一些Flux操作将保持为空。解析操作非常耗时,我希望能够更早地开始一些Flux操作,这样我就可以压缩一点时间——这对我正在完成的任务非常重要。所以热切的订阅是理想的,只要我能保证我不会错过任何项目的排放。

考虑到这一点,我怎么能:

  1. 创建一个";支架";或一个";容器";对于解析所有内容所需的每个Flux操作,并将它们初始化为空(如Flux.empty()(
  2. 将项添加到我在上面的项1中可以创建的任何项中——它被初始化为空,但我可能想要来自一个或多个有限和异步Flux操作的数据,但我不想将它们分开,当我在它们上使用collectList()来生成Mono时,它们可以显示为一个流
  3. 当其中一些Flux操作应该在其他操作之前启动时,我如何启动它们,并确保不会丢失任何数据?例如,如果我开始一个名称解析Flux,我可以添加它吗,就像上面的第2项一样?假设我想开始检索一些数据,然后执行同步操作,然后根据同步操作的结果创建另一个名称解析Flux,我可以将这个新的Flux附加到原始名称解析Fluz上吗,因为它将返回相同的数据类型?我知道Flux.merge(),但如果可能的话,使用一个我可以不断添加的Flux引用会很方便

我是否需要一个集合对象,如列表,然后使用合并操作?最初,我考虑使用ConnectableFlux,直到我意识到它是用于连接多个订阅者,而不是用于连接多家发布者。连接多个出版商是我认为很好的解决方案,除非这是一种可以用更好的方式处理的常见模式。

我做反应式编程的时间很短,所以请耐心等待我描述我想做什么的方式。如果我能更好地澄清我的意图,请让我知道我不清楚的地方,我很乐意尝试澄清。提前感谢您的时间和帮助!

编辑:这是Kotlin的最终版本,简洁明了:

private val log = KotlinLogging.logger {}
class ReactiveDataService {
private val createMono: () -> Mono<List<Int>> = {
Flux.just(9, 8, 7)
.flatMap {
Flux.fromIterable(List(it) { Random.nextInt(0, 100) })
.parallel()
.runOn(Schedulers.boundedElastic())
}
.collectList()
.cache()
}
private val processResults: (List<String>, List<String>) -> String =
{ d1, d2 -> "ntdownstream 1: $d1ntdownstream 2: $d2" }
private val convert: (List<Int>, Int) -> Flux<String> =
{ data, multiplier -> Flux.fromIterable(data.map { String.format("%3d", it * multiplier) }) }
fun doQuery(): String? {
val mono = createMono()
val downstream1 = mono.flatMapMany { convert(it, 1) }.collectList()
val downstream2 = mono.flatMapMany { convert(it, 2) }.collectList()
return Mono.zip(downstream1, downstream2, processResults).block()
}
}
fun main() {
val service = ReactiveDataService()
val start = System.currentTimeMillis()
val result = service.doQuery()
log.info("{}ntTotal time: {}ms", result, System.currentTimeMillis() - start)
}

输出:

downstream 1: [ 66,  39,  40,  88,  97,  35,  70,  91,  27,  12,  84,  37,  35,  15,  45,  27,  85,  22,  55,  89,  81,  21,  43,  62]
downstream 2: [132,  78,  80, 176, 194,  70, 140, 182,  54,  24, 168,  74,  70,  30,  90,  54, 170,  44, 110, 178, 162,  42,  86, 124]
Total time: 209ms

这听起来是reactor的理想工作。可以使用弹性调度程序将同步调用包装为Fluxes(或Monos(返回,以允许它们并行执行。然后使用各种运算符,您可以将它们组合在一起,形成一个表示结果的通量。订阅Flux,整个机器就会启动。

我认为你需要使用Mono.flatMapMany而不是Flux.usingWhen.

public class ReactiveDataService {
public static void main(final String[] args) {
ReactiveDataService service = new ReactiveDataService();
service.doQuery();
}
private Flux<Integer> process1(final List<Integer> data) {
return Flux.fromIterable(data);
}
private Flux<Integer> process2(final List<Integer> data) {
return Flux.fromIterable(data).map(i -> i * 2);
}
private String process3(List<Integer> downstream1, List<Integer> downstream2) {
System.out.println("downstream 1: " + downstream1);
System.out.println("downstream 2: " + downstream2);
return "Done";
}
private void doQuery() {
final Mono<List<Integer>> mono =
Flux.just(9, 8, 7)
.flatMap(
limit ->
Flux.fromStream(
Stream.generate(() -> new Random().nextInt(100))
.peek(
i -> {
try {
Thread.sleep(500);
} catch (InterruptedException ignored) {
}
})
.limit(limit))
.parallel()
.runOn(Schedulers.boundedElastic()))
.collectList()
.cache();
final Mono<List<Integer>> downstream1 = mono.flatMapMany(this::process1).collectList();
final Mono<List<Integer>> downstream2 = mono.flatMapMany(this::process2).collectList();
Mono.zip(downstream1, downstream2, this::process3).block();
}
}

最新更新