我正在使用KafkaIO.read()
,我想从特定的偏移量开始消费。
在某些时候,曾经有一种KafkaIO.read().withStartFromCheckpointMark()
方法可以做到这一点。
我从文档中看到有一种方法可以通过以下方式:
卡夫卡检查点标记由运行器提供;
我该怎么做?
谢谢
没有直接支持,但有几个选项:
withStartReadTime()
可能更适合。- 您可以创建
group.id
并在该组中提交偏移量。在 KafkaConsumer 配置中设置group.id
时,KafkaIO 默认为从为组提交的偏移量恢复。您还可以选择在 KafkaConsumer 中启用auto.commit
。请参阅 JavaDoc 中的"高级 Kafka 配置"。