我已经在project Reactor上工作了一段时间了,但是我对SI还是个新手,特别是在SI中使用反应性组件的时候。这可能是非常基本的,但我仍然不能弄清楚,我读得越多的Spring文档,我理解得越少。
我有这个流,我使用一个返回Monovoid
或Mono<Void>
(单向'MessageHandler')的处理程序/网关,则表示流的结束,您不能在下游添加任何其他内容。所以剩下的就是:
@Bean
public IntegrationFlow flow(AdapterThatReturnsMonoVoid adapter) {
return f -> f
// transformations, routing and other stuff..
.handle(adapter::handleMessage)
}
。如何订阅Mono ?如果我想订阅那个Mono的频道呢?即使Mono不发出任何项目,我也希望能够订阅完成信号。
例如,如果我使用MongoDb.reactiveOutboundChannelAdapter(...)
,当我运行流时,我可以看到有效负载实际上被存储在mongo中。所以我猜订阅是自动完成的?
我想我的目的是得到某种手术完成的收据。
谢谢你的帮助!
MongoDb.reactiveOutboundChannelAdapter(...)
由ReactiveMongoDbStoringMessageHandler
包装成ReactiveMessageHandlerAdapter
来表示,真正实现自动订阅。
如果服务激活器被标记为async
,则任何Publisher
应答都将发生相同的情况。所以,如果你的adapter::handleMessage
返回Mono<Void>
,你可以这样做你会得到一个完整的Mono
:
@Bean
public IntegrationFlow reactiveStore() {
return f -> f
.<Object>handle((p, h) -> Mono.empty(),
e -> e.async(true)
.customizeMonoReply((message, mono) ->
mono.doOnSuccess(data -> System.out.println("Completed: " + data))));
}
我在日志中得到这个:
Completed: null
我可能需要修改ReactiveMessageHandlerAdapter
,因为看起来任何ReactiveMessageHandler
都可以直接更顺利地处理。请提出一个GH问题,这样我们就不会失去我们的讨论。
与此同时,这里有一些文档来澄清更多:
https://docs.spring.io/spring-integration/docs/current/reference/html/reactive-streams.html reactive-message-handler
https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints.html reactive-advice