Kafka提供了有用的命令行工具kafka.tools.GetOffsetShell
,但我在我的应用程序中需要它的功能。
我想获取指定主题中每个分区的所有偏移量,如下所示:
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list kafka:9092 --topic com.group.test.Foo
com.group.test.Foo:0:10
com.group.test.Foo:1:11
com.group.test.Foo:2:10
但我不想运行进程bin/kafka-run-class.sh kafka.tools.GetOffsetShell
.
如何在 Java 中使用 kafka api 做同样的事情? 我是否必须为每个TopicPartition
创建 consumer 并调用:KafkaConsumer#position
?我需要更简单的方法吗?
默认情况下,GetOffsetShell
返回每个分区的结束偏移量。您可以像这样以编程方式检索这些偏移量:
......
try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties)) {
consumer.subscribe(Arrays.asList("topicName"));
Set<TopicPartition> assignment;
while ((assignment = consumer.assignment()).isEmpty()) {
consumer.poll(Duration.ofMillis(100));
}
consumer.endOffsets(assignment).forEach((tp, offset) -> System.out.println(tp + ": " + offset));
}