消息的手动确认(检查点):Spring Cloud Stream Kenesis Binder &g



我们正在尝试将我们的Spring Cloud Stream应用程序从Kafka移植到AWS Kenesis。我们需要手动确认处理某些超时条件。

对于Kafka,我们使用属性autocommitoffset为false,并使用ACKNOWLEDGEMENT头来处理手动确认。

我浏览了Spring Cloud Stream的文档浏览了以下内容:https://dataflow.spring.io/docs/recipes/kinesis/simple-producer-consumer/https://github.com/spring-cloud/spring-cloud-stream-binder-aws-kinesis/blob/master/spring-cloud-stream-binder-kinesis-docs/src/main/asciidoc/overview.adoc

但无法找到任何解决方案。任何提示都会很有帮助的。

经过一番搜索,找到了解决方案:

在Kenesis中,shard相当于partition,checkpoint相当于offset

In Application Yml:

spring:
cloud:
stream:
kinesis:
bindings:
consumer-in-0:
consumer:
checkpointMode: manual

检查点示例代码

@Bean
public Consumer<Message<String>> consume() {
return message -> {
System.out.println("message received : "+ message.getPayload());
System.out.println("message headers : "+ message.getHeaders());
Checkpointer checkPointer = (Checkpointer) message.getHeaders().get(AwsHeaders.CHECKPOINTER);
checkPointer.checkpoint();

};
}

被称为Kenesis Consumer Binder属性

最新更新