Spring 中的异步消息网关与 RxNetty 集成



我正在尝试将Spring 集成中的异步消息传递网关与 RxNetty(异步 HTTP)结合使用。基本上我想要的是将一个可观察/可处理的未来返回给调用线程,并使用可观察的zip/map/flatmap进行调用线程中的一堆出站 HTTP 调用。我只是想看看这是否很有可能。另外,如果不是使用 Rxjava 构造,我最好使用聚合器 eip 来构建一个简单的工作流程。

从版本4.1开始,网关可以返回 Reactor 2.0Promise<?>

@MessagingGateway
public static interface TestGateway {
@Gateway(requestChannel = "promiseChannel")
Promise<Integer> multiply(Integer value);
}
...
@ServiceActivator(inputChannel = "promiseChannel")
public Integer multiply(Integer value) {
return value * 2;
}
...
Streams.defer(Arrays.asList("1", "2", "3", "4", "5"))
.get()
.map(Integer::parseInt)
.mapMany(integer -> testGateway.multiply(integer))
.collect()
.consume(integers -> ...)
.flush();

从版本5.0开始,已更改为反应堆3.1Mono

我很确定有一些适配器可以将这种类型转换为对 RxJava 有价值的东西。

自版本4.2起,网关也支持该CompletableFuture<?>

CompletableFuture<String> process(String data);
...
CompletableFuture result = process("foo")
.thenApply(t -> t.toUpperCase());
...
String out = result.get(10, TimeUnit.SECONDS);

http://docs.spring.io/spring-integration/reference/html/messaging-endpoints-chapter.html#async-gateway

最新更新