收到Spring Integration的确认消息



我有一个用例,我希望在应用程序收到消息后立即确认消息,而不是等待流完成。流量如下。当前,简单消息侦听器容器配置为AUTO确认模式。

@Bean
public IntegrationFlow processAggregateEventFlow(SimpleMessageListenerContainer messageListenerContainer,
@Qualifier("errorChannel") MessageChannel eventErrorChannel) {

return IntegrationFlows
// Create message listener container and queueEvent error channel
.from(Amqp.inboundAdapter(messageListenerContainer).errorChannel(eventErrorChannel))
.transform(new JsonToObjectTransformer(Request.class, jacksonConfiguration.jsonObjectMapper()))
.filter(Request.class, e -> true)
.handle(requestMessageHandler)
.get();
}

查看AcknowledgeMode.NONE并阅读它们的JavaDocs:

public enum AcknowledgeMode {
/**
* No acks - {@code autoAck=true} in {@code Channel.basicConsume()}.
*/
NONE,
/**
* Manual acks - user must ack/nack via a channel aware listener.
*/
MANUAL,
/**
* Auto - the container will issue the ack/nack based on whether
* the listener returns normally, or throws an exception.
* <p><em>Do not confuse with RabbitMQ {@code autoAck} which is
* represented by {@link #NONE} here</em>.
*/
AUTO;

对于MANUAL,有几个标头添加到来自AMQP入站通道适配器的消息中:

headers.put(AmqpHeaders.DELIVERY_TAG, deliveryTag);
headers.put(AmqpHeaders.CHANNEL, channel);

因此,您可以在from(Amqp)之后有一个.handle()来调用channel.basicAck(deliveryTag, false);。请参阅文档中的更多信息:https://docs.spring.io/spring-integration/docs/current/reference/html/amqp.html#amqp-入站ack

最新更新