从指定主题中每个分区的 kafka 最后偏移量中检索



Kafka提供了有用的命令行工具kafka.tools.GetOffsetShell,但我在我的应用程序中需要它的功能。

我想获取指定主题中每个分区的所有偏移量,如下所示:

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list kafka:9092 --topic com.group.test.Foo
com.group.test.Foo:0:10
com.group.test.Foo:1:11
com.group.test.Foo:2:10

但我不想运行进程bin/kafka-run-class.sh kafka.tools.GetOffsetShell.

如何在 Java 中使用 kafka api 做同样的事情? 我是否必须为每个TopicPartition创建 consumer 并调用:KafkaConsumer#position?我需要更简单的方法吗?

默认情况下,GetOffsetShell返回每个分区的结束偏移量。您可以像这样以编程方式检索这些偏移量:

......
try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties)) {
consumer.subscribe(Arrays.asList("topicName"));
Set<TopicPartition> assignment;
while ((assignment = consumer.assignment()).isEmpty()) {
consumer.poll(Duration.ofMillis(100));
}
consumer.endOffsets(assignment).forEach((tp, offset) -> System.out.println(tp + ": " + offset));
}

相关内容

最新更新