我尝试从Java中清除Kafka主题,如下所示,将保留时间设置为1秒,然后返回到原始值。但这些消息并没有从主题中删除。怎么了?
Map<ConfigResource,Collection<AlterConfigOp>> altConf = new HashMap<ConfigResource,Collection<AlterConfigOp>>();
Collection<AlterConfigOp> altConfOp = new ArrayList<AlterConfigOp>();
AlterConfigOp aco1 = new AlterConfigOp(new ConfigEntry(TopicConfig.RETENTION_MS_CONFIG, "1000"), AlterConfigOp.OpType.SET);
altConfOp.add(aco1);
altConf.put(new ConfigResource(ConfigResource.Type.TOPIC, topic), altConfOp);
ac.incrementalAlterConfigs(altConf);
Thread.sleep(5000);
altConf = new HashMap<ConfigResource,Collection<AlterConfigOp>>();
altConfOp = new ArrayList<AlterConfigOp>();
aco1 = new AlterConfigOp(new ConfigEntry(TopicConfig.RETENTION_MS_CONFIG, originalRetention), AlterConfigOp.OpType.SET);
altConfOp.add(aco1);
altConf.put(new ConfigResource(ConfigResource.Type.TOPIC, topic), altConfOp);
ac.incrementalAlterConfigs(altConf);
正如评论中提到的,主题保留限制是下限,Kafka不保证超过限制的记录何时会被删除。
删除记录的正确方法是使用DeleteRecords API。
对于AdminClient,您可以使用deleteRecords()
。这将删除所有偏移量低于所提供偏移量的记录。您可以使用listOffsets()
来查找分区的结束偏移量,并将它们传递给deleteRecords()
来有效地清除主题。