Apache Beam:从Kafka开始读取,从初始偏移开始,而不是从最新偏移开始



我正在尝试编写一个简单的Beam管道,该管道从每个Kafka主题的分区中存在的最早偏移量开始消耗数据。

我一直无法弄清楚如何使用主题中最早可能的偏移量的数据。

通常,KafkaConsumer实例将尝试使用每个分区的最新偏移量中的数据。这意味着他们将开始只使用发布到该主题的新消息。

如果您希望您的管道从消耗最早可用的Kafka偏移量开始,那么您可以通过调用withStartReadTime参数来实现这一点:

p.apply(KafkaIO.read()
.withBootstrapServers(KAFKA_BOOTSTRAP_SERVER)
.withTopic(KAFKA_TOPIC)
// By reading data from EPOCH, you'll ensure the earliest messages are consumed
.withStartReadTime(Instant.EPOCH))

这样就可以了!

最新更新