如何在reactive Spring Integration中处理reactive类型



我正在玩一点Reactive Spring Integration,我尝试执行以下基本操作:

@Override
protected IntegrationFlowDefinition<?> buildFlow() {
return from(someReactiveInboundChannelAdapter)
.handle(new ReactiveMessageHandlerAdapter(flux -> flux.subscribe(System.out::println)));
}

但是这不起作用,因为框架不会识别fluxFlux<?>实例。框架将其视为Message<?>,我不知道如何以及从哪里开始编写Reactor代码。

这是正确的。ReactiveMessageHandlerAdapter需要ReactiveMessageHandler的lambda,该合约为:
Mono<Void> handleMessage(Message<?> message);

我不确定是什么驱使你认为输入必须是Flux

我假设您的someReactiveInboundChannelAdapterMessageProducerSupport的扩展,并且完全执行subscribeToPublisher(Publisher<? extends Message<?>> publisher)逻辑。这样做的目的是以被动的方式从源获取数据,但仍然将每个事件作为消息发送给下游通道。因此,应该清楚的是,handle()将接收到带有单个项目的消息,而不是整个Flux

如果您想将流量视为通量,请考虑使用fluxTransform()运算符。

此外,最好不要自己订阅,而是在配置和启动结束后,在框架中进行订阅。

从技术上讲,你不应该考虑反应型。您只需要分别配置一个流,并只为单个项目编写一个逻辑:框架为您进行反应式交互。Project Reactor是一个围绕FluxMono的库。这就是我们谈论反应型的地方。Spring Integration是一个消息传递框架,它的通信是通过消息完成的。最后,对于每一条消息,端点之间的交互是否以反应式方式完成一定无关紧要。因此,您的处理逻辑可以不受反应类型的影响。

UDATE

如果你想从someReactiveInboundChannelAdapter完全控制Flux,那么你需要这样做:

@Bean
public Publisher<Message<Object>> reactiveFlow() {
return IntegrationFlows.from(someReactiveInboundChannelAdapter)
.toReactivePublisher();
}

然后在需要的时候注入Publisher。然后执行Flux.from(publisher)和任何需要的反应运算符,包括subscribe()

这里有一些样本:https://github.com/spring-projects/spring-integration/blob/main/spring-integration-core/src/test/java/org/springframework/integration/dsl/reactivestreams/ReactiveStreamsTests.java

IntegrationFlowAdapter不能用于这种类型的配置,因为它不能接受IntegrationFlow作为buildFlow()的结果。

不过,fluxTransform()也可以为您做到这一点:

.fluxTransform(flux -> flux.as(Mono::just))

因此,下游流的有效载荷将是Flux<Message<?>>,您可以自己处理。从该fluxTransform()返回的Mono将被框架订阅。其价值的Flux是您在下游流程中的责任。然后你可以使用普通的MessageHandler:

.handle(m -> ((Flux<Message<?>>) m.getPayload())....subscribe())

最新更新