<Void> 从 SI 中的反应式单向处理程序订阅单声道



我已经在project Reactor上工作了一段时间了,但是我对SI还是个新手,特别是在SI中使用反应性组件的时候。这可能是非常基本的,但我仍然不能弄清楚,我读得越多的Spring文档,我理解得越少。

我有这个流,我使用一个返回Mono的响应适配器。据我所知,一旦您使用返回voidMono<Void>(单向'MessageHandler')的处理程序/网关,则表示流的结束,您不能在下游添加任何其他内容。所以剩下的就是:

@Bean
public IntegrationFlow flow(AdapterThatReturnsMonoVoid adapter) {
return f -> f
// transformations, routing and other stuff..
.handle(adapter::handleMessage)      
}

。如何订阅Mono ?如果我想订阅那个Mono的频道呢?即使Mono不发出任何项目,我也希望能够订阅完成信号。

例如,如果我使用MongoDb.reactiveOutboundChannelAdapter(...),当我运行流时,我可以看到有效负载实际上被存储在mongo中。所以我猜订阅是自动完成的?

我想我的目的是得到某种手术完成的收据。

谢谢你的帮助!

MongoDb.reactiveOutboundChannelAdapter(...)ReactiveMongoDbStoringMessageHandler包装成ReactiveMessageHandlerAdapter来表示,真正实现自动订阅。

如果服务激活器被标记为async,则任何Publisher应答都将发生相同的情况。所以,如果你的adapter::handleMessage返回Mono<Void>,你可以这样做你会得到一个完整的Mono:

@Bean
public IntegrationFlow reactiveStore() {
return f -> f
.<Object>handle((p, h) -> Mono.empty(),
e -> e.async(true)
.customizeMonoReply((message, mono) ->
mono.doOnSuccess(data -> System.out.println("Completed: " + data))));
}

我在日志中得到这个:

Completed: null

我可能需要修改ReactiveMessageHandlerAdapter,因为看起来任何ReactiveMessageHandler都可以直接更顺利地处理。请提出一个GH问题,这样我们就不会失去我们的讨论。

与此同时,这里有一些文档来澄清更多:

https://docs.spring.io/spring-integration/docs/current/reference/html/reactive-streams.html reactive-message-handler

https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints.html reactive-advice

最新更新