我想使用spring cloud stream手动提交偏移量-仅当消息处理成功时。这是我的代码应用程序。yml,处理程序类
public void process(Message<?> message) {
System.out.println(message.getPayload());
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
if (acknowledgment != null) {
System.out.println("Acknowledgment provided");
acknowledgment.acknowledge();
}
}
---------------------------------------------------------------------------------
spring:
application:
name: springCloud
cloud:
stream:
default-binder: kafka
kafka:
bindings:
myChannel:
consumer:
autoCommitOffset: false
但是我的确认对象是空的,因为头对象'kafka_acknowledgement'本身不存在。
- 如何获取确认对象?
- 我的要求是只有在处理成功时才提交偏移量,如果处理失败,我不想从通道弹出消息,以便以后可以读取它。上述代码是否足以实现这一点?
你用的是什么版本?
在3.1中,autoCommitOffset
被弃用,而ackMode
被设置为manual
;然而,看起来autoCommitOffset
现在完全被忽略了,而不是被弃用了。
使用yaml文件时,请使用属性'auto-commit-offset'