从最早开始读取:
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
阅读最新消息:
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
但是,如果我想从第18次提交开始,我应该使用哪一行呢?
您可以使用seek()
来强制消费者从特定的偏移量开始消费:
public void seek(TopicPartition partition, long offset)
覆盖使用者将在下一个
poll(timeout)
上使用的获取偏移。如果为同一分区多次,最新的偏移量将用于下一个CCD_ 4。请注意,如果此API是任意的,则可能会丢失数据在消耗过程中使用,以重置提取偏移
例如,假设您想从偏移18
:开始
TopicPartition topicPartition = new TopicPartition("myTopic", 0);
Long startOffset = 18L;
List<TopicPartition> topics = Arrays.asList(topicPartition);
consumer.assign(topics);
consumer.seek(topicPartition, startOffset);
// Then consume messages as normal..