KAFKA管理员:如何以编程方式每个主题显示和设置保留时间



我正在尝试通过编程(Java)获取并设置Kafka群集中某些主题的保留时间。

使用org.apache.kafka.clients.admin.AdminClient似乎不可能。

除命令行实用程序外,还有其他方法吗?

要模拟 kafka-configs --entity-type topics --entity-name "topic" --describe,您应该能够对此使用 AdminClient#describeConfigs simlar。

在这里,我仅过滤出来的配置,这些配置已由用户定义。如果您删除过滤器,则将获得所有主题级别和经纪级默认配置

Optional<List<ConfigEntry>> dynamicTopicConfigEntries;
try {
    // given org.apache.kafka.client.admin.AdminClient
    ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, "topic");
    dynamicTopicConfigEntries = Optional.of(adminClient.describeConfigs(Collections.singletonList(resource))
            .all()
            .thenApply(configMap -> configMap.get(resource).entries()
                    .stream().filter(e -> e.source() == ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG)
                    .collect(toList())
            )
            .get());
} catch (InterruptedException | ExecutionException e) {
    throw new RuntimeException("Unable to get topic description");
}

类似地,有一个--alter标志,该标志支持该命令(没有代码)

另外,KIP-248是值得关注的。

kafka.admin.TopicCommand Scala类,这是kafka-topics shell脚本从kafka二进制发行中使用的内容:

https://github.com/apache/kafka/blob/a421dd26ca140f821cd5be1a4f716cf04beb43/core/src/src/main/main/main/main/scala/scala/kafka/kafka/kafka/admin/admin/admin/topicccamsscal.scal.scala-l302-l318 可以使用它,尽管您需要将Kafka软件包作为对您项目的依赖性,而不仅仅是Kafka客户端。

如果您使用的是Kafka 2.1.1,则为Scala 2.12:

编译
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.12</artifactId>
    <version>2.1.1</version>
</dependency>

https://mvnrepository.com/artifact/org.apache.kafka/kafka_2.12/2.1.1

最新更新