如何在Apache Beam中手动提交Kafka偏移在特定的doFun执行结束时



我创建了一个简单的Apache Beam流管道,它从Kafka读取数据,做一些处理,并通过调用一些外部服务的API来持久化结果。我想确保在管道重启或失败期间没有数据丢失,所以我想在特定doFun执行结束时成功调用API后,手动将记录偏移量提交给Kafka。

在我以前的Kafka经验中,我知道通过使用Kafka Consumer的下面API,我可以手动提交记录偏移到Kafka。

consumer.commitSync(currentOffsets); 

在KafkaIO设置中有关闭自动提交的设置,但是我没有找到任何在Apache Beam中手动提交偏移的有效解决方案,因为我似乎无法访问doFun中的消费者。如果有专家可以与示例代码分享一些提示,我将不胜感激。

默认情况下,pipeline.apply(KafkaIO.read()...)将返回PCollection<KafkaRecord<K, V>>。因此,在您的管道下游,您可以从KafkaRecord元数据中获得偏移量,并以您需要的方式手动提交它(只是不要忘记在KafkaIO.read()中禁用AUTO_COMMIT)。

但是,您需要确保对外部API的调用和偏移提交将是原子的,以防止潜在的数据丢失(如果它是关键的)。

最新更新