如何使用Java中的Kafka消息,从特定的偏移量开始



从最早开始读取:

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

阅读最新消息:

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

但是,如果我想从第18次提交开始,我应该使用哪一行呢?

您可以使用seek()来强制消费者从特定的偏移量开始消费:

public void seek(TopicPartition partition, long offset)

覆盖使用者将在下一个poll(timeout)上使用的获取偏移。如果为同一分区多次,最新的偏移量将用于下一个CCD_ 4。请注意,如果此API是任意的,则可能会丢失数据在消耗过程中使用,以重置提取偏移


例如,假设您想从偏移18:开始

TopicPartition topicPartition = new TopicPartition("myTopic", 0);
Long startOffset = 18L;

List<TopicPartition> topics = Arrays.asList(topicPartition);
consumer.assign(topics);
consumer.seek(topicPartition, startOffset);
// Then consume messages as normal..

相关内容

最新更新