Kafka主题清除编程



我尝试从Java中清除Kafka主题,如下所示,将保留时间设置为1秒,然后返回到原始值。但这些消息并没有从主题中删除。怎么了?

Map<ConfigResource,Collection<AlterConfigOp>> altConf = new HashMap<ConfigResource,Collection<AlterConfigOp>>();
Collection<AlterConfigOp> altConfOp = new ArrayList<AlterConfigOp>();
AlterConfigOp aco1 = new AlterConfigOp(new ConfigEntry(TopicConfig.RETENTION_MS_CONFIG, "1000"), AlterConfigOp.OpType.SET);
altConfOp.add(aco1);
altConf.put(new ConfigResource(ConfigResource.Type.TOPIC, topic), altConfOp);

ac.incrementalAlterConfigs(altConf);

Thread.sleep(5000);
altConf = new HashMap<ConfigResource,Collection<AlterConfigOp>>();
altConfOp = new ArrayList<AlterConfigOp>();
aco1 = new AlterConfigOp(new ConfigEntry(TopicConfig.RETENTION_MS_CONFIG, originalRetention), AlterConfigOp.OpType.SET);
altConfOp.add(aco1);
altConf.put(new ConfigResource(ConfigResource.Type.TOPIC, topic), altConfOp);

ac.incrementalAlterConfigs(altConf);

正如评论中提到的,主题保留限制是下限,Kafka不保证超过限制的记录何时会被删除。

删除记录的正确方法是使用DeleteRecords API。

对于AdminClient,您可以使用deleteRecords()。这将删除所有偏移量低于所提供偏移量的记录。您可以使用listOffsets()来查找分区的结束偏移量,并将它们传递给deleteRecords()来有效地清除主题。

相关内容

  • 没有找到相关文章

最新更新