Spring cloud Stream - kafka - Null确认报头



我想使用spring cloud stream手动提交偏移量-仅当消息处理成功时。这是我的代码应用程序。yml,处理程序类

public void process(Message<?> message) {
System.out.println(message.getPayload());
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
if (acknowledgment != null) {
System.out.println("Acknowledgment provided");
acknowledgment.acknowledge();
}
}
---------------------------------------------------------------------------------
spring:
application:
name: springCloud
cloud:
stream:
default-binder: kafka
kafka:
bindings:
myChannel:
consumer:
autoCommitOffset: false

但是我的确认对象是空的,因为头对象'kafka_acknowledgement'本身不存在。

  1. 如何获取确认对象?
  2. 我的要求是只有在处理成功时才提交偏移量,如果处理失败,我不想从通道弹出消息,以便以后可以读取它。上述代码是否足以实现这一点?

你用的是什么版本?

在3.1中,autoCommitOffset被弃用,而ackMode被设置为manual;然而,看起来autoCommitOffset现在完全被忽略了,而不是被弃用了。

使用yaml文件时,请使用属性'auto-commit-offset'

最新更新