我有一个Spring Itegration (5.0.2(流程,它从HTTP端点读取数据并使用HTTP响应中的(拆分(数据发布Kafka消息。
我想在流程完成之前执行一个">最终"操作,某种"尝试,捕获,最后",但我不确定实现这一目标的最佳方法是什么。这是我的代码:
@Bean
public IntegrationFlow startFlow() {
return IntegrationFlows.from(() -> new GenericMessage<>(""),
c -> c.id("flow1")
.poller(
Pollers.fixedDelay(period, TimeUnit.MILLISECONDS, initialDelay).taskExecutor(taskExecutor)))
.handle(Http.outboundGateway("http://....)
.charset("UTF-8")
.expectedResponseType(String.class)
.httpMethod(HttpMethod.GET))
.transform(new transformer())
.split()
.channel(CHANNEL_1)
.controlBus()
.get();
}
@Bean
public IntegrationFlow toKafka(KafkaTemplate<?, ?> kafkaTemplate) {
return IntegrationFlows.from(CHANNEL_1)
.handle(Kafka.outboundChannelAdapter(kafkaTemplate)
// KAFKA SPECIFIC
.get();
}
本质上,当所有消息都发送完毕时(请注意,我使用的是.split
(,我需要调用 Spring bean 来更新一些数据。
谢谢
如果你在拆分器之后谈论"当所有消息",那么你需要看看一个.aggregate()
:https://docs.spring.io/spring-integration/docs/5.0.3.RELEASE/reference/html/messaging-routing-chapter.html#aggregator。
拆分器将特殊的sequence details
标头填充到每个拆分的项目中,默认情况下,聚合器能够使用收到的消息中的这些标头将它们收集到单个实体。
由于您谈论发送到 Kafka 后的过程,因此您应该将CHANNEL_1
作为PublishSubscribeChannel
,并将上述.aggregate()
作为该频道的最后一个订阅者。更新某些数据的服务应该已经在此聚合器之后。