升级到Spring Integration 5后,消耗消息不再工作



我正在尝试使用Spring Integration 4.3和Spring Boot 1.6升级项目为Spring Integration 5.1和Spring Boot 2.1。以前我有以下配置:

IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "queueName")
                    .id("myId")
                    .autoStartup(autoStartup)
                    .prefetchCount(10)
                    .concurrentConsumers(2)
                    .maxConcurrentConsumers(3)
                    .messageConverter(messageConverter()))
                    .aggregate(a -> a.correlationExpression("payload.entityId")
                                    .releaseExpression("size() eq iterator().next().payload.batchSize")
                                    .sendPartialResultOnExpiry(true)
                                    .groupTimeout(2000)
                                    .expireGroupsUponCompletion(true)
                                    .outputProcessor(myMessageGroupProcessor))
                    .handle(serviceActivatorBean, "myMethod", e -> e.advice(requestHandlerRetryAdviceForIntegrationFlow()))
                    .get();

在升级过程中,我尝试在此处遵循该文档,因此将配置更改为:

@Configuration
@EnableAutoConfiguration
@EnableIntegration
public class SpringConfig {
    @Bean(name = "myFlowId")
    public IntegrationFlow myFlow(ConnectionFactory connectionFactory, ServiceActivatorBean serviceActivatorBean,
                                  @Value("${spring.integration.flow.auto-startup:true}") boolean autoStartup,
                                  MyMessageGroupProcessor myMessageGroupProcessor) {
        IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "queueName")
                        .id("myId")
                        .autoStartup(autoStartup)
                        .configureContainer(c -> c.acknowledgeMode(MANUAL)
                            .prefetchCount(10)
                            .concurrentConsumers(2)
                            .maxConcurrentConsumers(3)
                        )
                        .messageConverter(messageConverter()))
                        .aggregate(a -> a.correlationExpression("payload.entityId")
                                        .releaseExpression("size() eq one().payload.batchSize")
                                        .sendPartialResultOnExpiry(true)
                                        .groupTimeout(2000)
                                        .expireGroupsUponCompletion(true)
                                        .outputProcessor(myMessageGroupProcessor))
                        .handle(serviceActivatorBean, "myMethod", e -> e.advice(requestHandlerRetryAdviceForIntegrationFlow()))
                        .get();
    }
}

但是,当我发布消息时,似乎并没有被集成流程收到/处理。我没有任何错误日志(即使我启用调试日志记录,也根本没有任何日志(,而且我不太确定从哪里开始调试。我很肯定这些消息实际上已发布给RabbitMQ,因此这不是问题。我会缺少什么?

我的问题实际上不是由于春季的整合,而是与Spring AMQP中的更改有关。以前可以这样创建"声明性":

@Bean
List<Binding> myBinding() {
    return List.of(<binding1>, <binding2>, ..)
}

,但在春季AMQP 2.1中应该更改为:

@Bean
Declarables myBinding() {
    return new Declarables(List.of(<binding1>, <binding2>, ..))
}

请参阅此处的文档。

顺便说一句,我的releaseExpression也错了,应该是size() eq one.payload.batchSize

相关内容

  • 没有找到相关文章

最新更新