使用Spring Cloud Stream+RabbitMQ(Functional)发送消息后手动确认



我发现了如何用函数方式进行手动ack。

但是,这并不能解释如何在消息之后手动ack发送
如果ack和发送消息之间存在网络错误,我认为消息将丢失
我不想丢失消息。

版本

  • 弹簧靴:2.7.1
  • 春云:3.2.4

这是我当前的代码

@Controller
class MyFunction(private val myService: MyService) {
private val objectMapper = ObjectMapper()
init {
objectMapper.registerKotlinModule()
}

@Bean
fun function(): (Message<ByteArray>) -> Message<MyResponse> = {
val channel = it.headers.get(AmqpHeaders.CHANNEL, Channel::class.java)!!
val deliveryTag = it.headers.get(AmqpHeaders.DELIVERY_TAG, java.lang.Long::class.java)!!.toLong()

val request = objectMapper.readValue(it.payload, MyRequest::class.java)

try {
val response = myService.process(request)

channel.basicAck(deliveryTag, false)
// I think it can potentially cause message loss
MessageBuilder.withPayload(result)
.build()
} catch (ex: Exception) {
channel.basicNack(deliveryTag, false, false)
throw ex
}
}
}

使用Consumer接收消息,使用StreamBridge发送(而不是使用Function.

但为什么你需要手动确认?引发异常将导致容器使用自动模式对消息进行nack(并在正常退出时进行ack(。

最新更新