我正在玩一点Reactive Spring Integration,我尝试执行以下基本操作:
@Override
protected IntegrationFlowDefinition<?> buildFlow() {
return from(someReactiveInboundChannelAdapter)
.handle(new ReactiveMessageHandlerAdapter(flux -> flux.subscribe(System.out::println)));
}
但是这不起作用,因为框架不会识别flux
是Flux<?>
实例。框架将其视为Message<?>
,我不知道如何以及从哪里开始编写Reactor代码。
ReactiveMessageHandlerAdapter
需要ReactiveMessageHandler
的lambda,该合约为:
Mono<Void> handleMessage(Message<?> message);
我不确定是什么驱使你认为输入必须是Flux
。
我假设您的someReactiveInboundChannelAdapter
是MessageProducerSupport
的扩展,并且完全执行subscribeToPublisher(Publisher<? extends Message<?>> publisher)
逻辑。这样做的目的是以被动的方式从源获取数据,但仍然将每个事件作为消息发送给下游通道。因此,应该清楚的是,handle()
将接收到带有单个项目的消息,而不是整个Flux
。
如果您想将流量视为通量,请考虑使用fluxTransform()
运算符。
此外,最好不要自己订阅,而是在配置和启动结束后,在框架中进行订阅。
从技术上讲,你不应该考虑反应型。您只需要分别配置一个流,并只为单个项目编写一个逻辑:框架为您进行反应式交互。Project Reactor是一个围绕Flux
和Mono
的库。这就是我们谈论反应型的地方。Spring Integration是一个消息传递框架,它的通信是通过消息完成的。最后,对于每一条消息,端点之间的交互是否以反应式方式完成一定无关紧要。因此,您的处理逻辑可以不受反应类型的影响。
UDATE
如果你想从someReactiveInboundChannelAdapter
完全控制Flux
,那么你需要这样做:
@Bean
public Publisher<Message<Object>> reactiveFlow() {
return IntegrationFlows.from(someReactiveInboundChannelAdapter)
.toReactivePublisher();
}
然后在需要的时候注入Publisher
。然后执行Flux.from(publisher)
和任何需要的反应运算符,包括subscribe()
。
这里有一些样本:https://github.com/spring-projects/spring-integration/blob/main/spring-integration-core/src/test/java/org/springframework/integration/dsl/reactivestreams/ReactiveStreamsTests.java
IntegrationFlowAdapter
不能用于这种类型的配置,因为它不能接受IntegrationFlow
作为buildFlow()
的结果。
不过,fluxTransform()
也可以为您做到这一点:
.fluxTransform(flux -> flux.as(Mono::just))
因此,下游流的有效载荷将是Flux<Message<?>>
,您可以自己处理。从该fluxTransform()
返回的Mono
将被框架订阅。其价值的Flux
是您在下游流程中的责任。然后你可以使用普通的MessageHandler
:
.handle(m -> ((Flux<Message<?>>) m.getPayload())....subscribe())