如何访问Spring集成流中间的流量



我尝试在不将流声明拆分为两个函数的情况下访问Spring Integration中的流量对象。我想知道如何执行以下操作:

@Bean
public IntegrationFlow mainFlow() {
return IntegrationFlows.from(somePublisher)
// Access publisher here to perform something like:
.handle(flux -> flux.buffer(Duration.ofMillis(200))
.handle(writeToS3)
.get();
}

我不介意将我在评论中谈到的通量操作转移到另一个类(也许是作为某种网关(,但对我来说,从同一个mainFlow函数开始和流显然非常重要,因此理解我在应用程序中所做的事情将非常清晰和可读。我看到了Monos的网关文档,但示例代码甚至不可能(他们谈论的Flux不在函数中,作为初学者,我很难理解那里发生了什么(。

IntegrationFlows.from(somePublisher)为所提供的Publisher启动一个实际流。流程的其余部分针对源Publisher中的每个单个事件进行。因此,只有当来自源的事件是Flux时,您的.handle(flux ->)才能工作。

如果你想将buffer()应用于源Publisher并继续下去,那么考虑使用reactive()自定义程序:https://docs.spring.io/spring-integration/docs/current/reference/html/reactive-streams.html#fluxmessagechannel-和电抗流消除器。

因此,我将使用:而不是handle()

.bridge(e -> e.reactive(flux -> flux.buffer(Duration.ofMillis(200)))

下一个handle(writeToS3)将针对缓冲的List结果执行,所以要小心您在writeToS3中正在做什么和期望做什么。

Artem的答案很好,也很正确,但我想要的是:

.fluxTransform(messageFlux -> messageFlux.buffer(Duration.ofMillis(200)))

最新更新