Spring Reactive Programming:如何创建一个动态的发布者列表作为Flux.merge的输入



我是Spring Reactive编程的新手,我正在开发一个返回Flux的REST端点。例如:

@PostMapping
public Flux<MyResponse> processRequests(@RequestBody List<MyRequest> requests) {
return Flux.merge(Arrays.asList(dataSource.processRequest(requests.get(0)), dataSource2.processRequest(requests.get(0)))).parallel()
.runOn(Schedulers.elastic()).sequential();
}

示例代码中的每个数据源(dataSource和dataSource2(都实现了一个如下所示的接口:

public interface MyResponseAdapter {
Flux<MyResponse> processRequest(MyRequest request);
}

该代码的工作原理很好,因为它按预期返回Flux,但正如您所看到的,该代码只引用MyRequest列表中的第一个元素。我需要做的是为MyRequest列表中的每个元素构造Flux.merge。有人能给我指对方向吗?

我想我已经确定了一个简单的解决方案:

List<Flux<MyResponse>> results = new ArrayList<>();
for (MyRequest myRequest : requests ) {
results.add(dataSource.processRequest(myRequest));
results.add(dataSource2.processRequest(myRequest));
}
return Flux.merge(results).parallel().runOn(Schedulers.elastic()).sequential();

最新更新