我尝试在不将流声明拆分为两个函数的情况下访问Spring Integration中的流量对象。我想知道如何执行以下操作:
@Bean
public IntegrationFlow mainFlow() {
return IntegrationFlows.from(somePublisher)
// Access publisher here to perform something like:
.handle(flux -> flux.buffer(Duration.ofMillis(200))
.handle(writeToS3)
.get();
}
我不介意将我在评论中谈到的通量操作转移到另一个类(也许是作为某种网关(,但对我来说,从同一个mainFlow
函数开始和流显然非常重要,因此理解我在应用程序中所做的事情将非常清晰和可读。我看到了Monos的网关文档,但示例代码甚至不可能(他们谈论的Flux不在函数中,作为初学者,我很难理解那里发生了什么(。
IntegrationFlows.from(somePublisher)
为所提供的Publisher
启动一个实际流。流程的其余部分针对源Publisher
中的每个单个事件进行。因此,只有当来自源的事件是Flux
时,您的.handle(flux ->)
才能工作。
如果你想将buffer()
应用于源Publisher
并继续下去,那么考虑使用reactive()
自定义程序:https://docs.spring.io/spring-integration/docs/current/reference/html/reactive-streams.html#fluxmessagechannel-和电抗流消除器。
因此,我将使用:而不是handle()
.bridge(e -> e.reactive(flux -> flux.buffer(Duration.ofMillis(200)))
下一个handle(writeToS3)
将针对缓冲的List
结果执行,所以要小心您在writeToS3
中正在做什么和期望做什么。
Artem的答案很好,也很正确,但我想要的是:
.fluxTransform(messageFlux -> messageFlux.buffer(Duration.ofMillis(200)))