我们正在尝试将我们的Spring Cloud Stream应用程序从Kafka移植到AWS Kenesis。我们需要手动确认处理某些超时条件。
对于Kafka,我们使用属性autocommitoffset
为false,并使用ACKNOWLEDGEMENT头来处理手动确认。
我浏览了Spring Cloud Stream的文档浏览了以下内容:https://dataflow.spring.io/docs/recipes/kinesis/simple-producer-consumer/https://github.com/spring-cloud/spring-cloud-stream-binder-aws-kinesis/blob/master/spring-cloud-stream-binder-kinesis-docs/src/main/asciidoc/overview.adoc
但无法找到任何解决方案。任何提示都会很有帮助的。
经过一番搜索,找到了解决方案:
在Kenesis中,shard
相当于partition
,checkpoint
相当于offset
In Application Yml:
spring:
cloud:
stream:
kinesis:
bindings:
consumer-in-0:
consumer:
checkpointMode: manual
检查点示例代码
@Bean
public Consumer<Message<String>> consume() {
return message -> {
System.out.println("message received : "+ message.getPayload());
System.out.println("message headers : "+ message.getHeaders());
Checkpointer checkPointer = (Checkpointer) message.getHeaders().get(AwsHeaders.CHECKPOINTER);
checkPointer.checkpoint();
};
}
被称为Kenesis Consumer Binder属性