我发现了如何用函数方式进行手动ack。
但是,这并不能解释如何在消息之后手动ack发送
如果ack和发送消息之间存在网络错误,我认为消息将丢失
我不想丢失消息。
版本
- 弹簧靴:2.7.1
- 春云:3.2.4
这是我当前的代码
@Controller
class MyFunction(private val myService: MyService) {
private val objectMapper = ObjectMapper()
init {
objectMapper.registerKotlinModule()
}
@Bean
fun function(): (Message<ByteArray>) -> Message<MyResponse> = {
val channel = it.headers.get(AmqpHeaders.CHANNEL, Channel::class.java)!!
val deliveryTag = it.headers.get(AmqpHeaders.DELIVERY_TAG, java.lang.Long::class.java)!!.toLong()
val request = objectMapper.readValue(it.payload, MyRequest::class.java)
try {
val response = myService.process(request)
channel.basicAck(deliveryTag, false)
// I think it can potentially cause message loss
MessageBuilder.withPayload(result)
.build()
} catch (ex: Exception) {
channel.basicNack(deliveryTag, false, false)
throw ex
}
}
}
使用Consumer
接收消息,使用StreamBridge
发送(而不是使用Function
.
但为什么你需要手动确认?引发异常将导致容器使用自动模式对消息进行nack(并在正常退出时进行ack(。