我有一个带有 8 个分区的 kafka 主题,从单个消费者订阅该主题,并且我为消费者提供了唯一的消费者组。现在我尝试仅使用来自所有分区的最新消息(在我的情况下,从当前时间开始 3 分钟前(。 我使用了如下所示的偏移时间方法。
List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
List<TopicPartition> topicPartions = partitionInfos.stream().......collect(Collectors.toList());
Long value = Instant.now().minus(120,ChronoUnit.SECONDS).toEpochMillis();
Map<TopicPartion,Long> topicPartitionTime = topicPartions.stream().collect(COllectors.toMap(tp -> tp,(value)));
Map<TopicPartition, OffsetAndTimeStamp> offsets = consumer.offsetsForTimes(topicPartitionTime);
现在的问题是 偏移时间仅返回一个或两个分区偏移位置,并为剩余位置返回 null。
我想消耗所有分区最近的消息,而不是一个或两个分区。
我也在下面尝试过
consumer.unsubscribe();
consumer.assign(allPartitions);
Map<TopicPartition, OffsetAndTimeStamp> offsets = consumer.offsetsForTimes(topicPartitionTime);
但仍然只得到一两个偏移位置。在最坏的情况下,有时所有部分的偏移量为空。
如果 offsetForTimes 仅适用于一个/两个分区,如何从单个消费者轮询所有分区最近的记录?
编辑:我正在使用Kafka集群。 在 3-4 台机器上共享 8 个分区。
其他输入:-我能够在以下情况下重现问题。
- 创建三个主题 A(1 分区(、B(10 分区(和 C(10-分区(
- KafkaStreams使用来自A的消息并将消息推送到B&C。
- 将大约 100 条消息推送到 A 主题。KafkaStreams消费并推送到B&C主题。我可以看到消息分布在B&C中的所有分区上(即10个分区包含大约10条消息(。
- 我创建了单个KafkaConsumer,消费B主题。现在我调用 offsetForTimes 方法,所有分区和时间戳是当前时间减去 5 分钟。
- 确保 consumer.assignment(( 返回 offsetForTimes 之前的所有分区。
- offsetForTimes 返回带有偏移位置的单个分区,但当我调用 consumer.poll 方法时,它也返回来自其他分区的消息。
使用 Apache Kafka 版本 - 2.11-2.2.0 卡夫卡客户端jar - 2.0.1
提前感谢帮助。
我无法重现您的状况; 我唯一得到偏移量null
的情况是该分区没有提交偏移量的情况。 例如,我有 10 个分区,但只写入 8 个分区:
@SpringBootApplication
public class So59200574Application implements ConsumerSeekAware {
public static void main(String[] args) {
SpringApplication.run(So59200574Application.class, args);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so59200574").partitions(10).replicas(1).build();
}
@KafkaListener(id = "so59200574", topics = "so59200574")
public void listen(String in) {
System.out.println(in);
}
@Bean
public ConsumerAwareRebalanceListener rebal() {
return new ConsumerAwareRebalanceListener() {
@Override
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
final long tenSecondsAgo = System.currentTimeMillis() - 10_000L;
partitions.forEach(tp -> timestampsToSearch.computeIfAbsent(tp, tp1 -> tenSecondsAgo));
System.out.println(consumer.offsetsForTimes(timestampsToSearch));
}
};
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> IntStream.range(0, 8).forEach(i -> template.send("so59200574", i, null, "foo" + i));
}
}