弹簧集成:在流程完成时调用服务



我有一个Spring Itegration (5.0.2(流程,它从HTTP端点读取数据并使用HTTP响应中的(拆分(数据发布Kafka消息。

我想在流程完成之前执行一个">最终"操作,某种"尝试,捕获,最后",但我不确定实现这一目标的最佳方法是什么。这是我的代码:

@Bean
public IntegrationFlow startFlow() {
   return IntegrationFlows.from(() -> new GenericMessage<>(""),
  c -> c.id("flow1")
    .poller(
      Pollers.fixedDelay(period, TimeUnit.MILLISECONDS, initialDelay).taskExecutor(taskExecutor)))
  .handle(Http.outboundGateway("http://....)
     .charset("UTF-8")
    .expectedResponseType(String.class)
    .httpMethod(HttpMethod.GET))
  .transform(new transformer())
  .split()
  .channel(CHANNEL_1)
  .controlBus()
  .get();
}
@Bean
public IntegrationFlow toKafka(KafkaTemplate<?, ?> kafkaTemplate) {
return IntegrationFlows.from(CHANNEL_1)
  .handle(Kafka.outboundChannelAdapter(kafkaTemplate)
    // KAFKA SPECIFIC
  .get();
}

本质上,当所有消息都发送完毕时(请注意,我使用的是.split(,我需要调用 Spring bean 来更新一些数据。

谢谢

如果你在拆分器之后谈论"当所有消息",那么你需要看看一个.aggregate():https://docs.spring.io/spring-integration/docs/5.0.3.RELEASE/reference/html/messaging-routing-chapter.html#aggregator。

拆分器将特殊的sequence details标头填充到每个拆分的项目中,默认情况下,聚合器能够使用收到的消息中的这些标头将它们收集到单个实体。

由于您谈论发送到 Kafka 后的过程,因此您应该将CHANNEL_1作为PublishSubscribeChannel,并将上述.aggregate()作为该频道的最后一个订阅者。更新某些数据的服务应该已经在此聚合器之后。

最新更新