如何合并活动-反应流



我刚刚开始探索项目reactor,我目前对它的主要目的的理解是以异步方式从多个来源获取信息。例如,我有一个源,它快速生成数据,而另一个源生成数据的速度非常慢。我想合并这两个来源,并尽快从他们两个返回信息,一旦我得到它。我试过这样的代码:

@RestController
public class ReactiveController {
@RequestMapping("/info")
public Flux<String> getInfoFromServices() {
return Flux.merge(getDataFromSlowSource(), getDataFromFastSource()).take(10);
}
private Flux<String> getDataFromFastSource() {
return Flux.generate(sink -> {
sink.next("data from fast sourcen");
try {
Thread.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
private Flux<String> getDataFromSlowSource() {
return Flux.generate(sink -> {
sink.next("data from slow sourcen");
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}

我期望从服务器得到这样的答案:

data from fast source
data from fast source
data from slow source
data from fast source
data from fast source
data from slow source
data from fast source
data from fast source
data from slow source

但我有:

data from slow source
data from slow source
data from slow source
data from slow source
data from slow source
data from slow source
data from slow source
data from slow source
data from slow source
data from slow source

那么,我可以在源生成数据后同时从两个源获取数据吗?

您的问题是Thread.sleep阻塞了当前线程。这就是你看到这种行为的原因。在快速和慢速源中使用delayElements,而不是使用Thread.sleep,您将看到预期的行为。

private Flux<Object> getDataFromFastSource() {
return Flux.generate(sink -> {
sink.next("data from fast sourcen");
}).delayElements(Duration.ofSeconds(1));
}
private Flux<Object> getDataFromSlowSource() {
return Flux.generate(sink -> {
sink.next("data from slow sourcen");
}).delayElements(Duration.ofSeconds(2));
}

注意:

始终使整个反应器链不阻塞,并使用适当的调度器来执行此操作。更多信息请点击此处。

http://www.vinsguru.com/reactive-programming-schedulers/

您的问题是合并操作、慢源和快源都在同一个线程上运行。因此,这两个来源之间不存在竞争。

如果你这样修改你的代码,使慢速和快速的源代码在不同的线程(调度器(上运行,你会看到预期的结果:

@RequestMapping("/info")
public Flux<String> getInfoFromServices() {
return Flux.merge(
getDataFromSlowSource().subscribeOn(Schedulers.boundedElastic(),
getDataFromFastSource().subscribeOn(Schedulers.boundedElastic()
).take(10);
}

关于vins的回答:delayElements(duration)方法也使用了一个调度器。

最新更新