未删除过期的Apache Kafka消息



我有一个spring-boot(2.1.3(服务,将消息发布到kafka(2.12-2.3.0(主题。该服务创建主题,稍后在服务设置后,将retention.ms设置为1秒。

当前正在调试此代码

@SpringBootApplication()
@EnableAsync
public class MetricsMsApplication {
public static void main(String[] args) {
SpringApplication.run(MetricsMsApplication.class, args);
}
@Bean
public NewTopic topic1() {
NewTopic nt = new NewTopic("metrics", 10, (short) 1);
return nt;
}
@EventListener(ApplicationReadyEvent.class)
private void init() throws ExecutionException, InterruptedException {
Map<String, Object> config = new HashMap<>();
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
AdminClient client = AdminClient.create(config);
ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, "metrics");
// Update the retention.ms value
ConfigEntry retentionEntry = new ConfigEntry(TopicConfig.RETENTION_MS_CONFIG, "1000");
Map<ConfigResource, Config> updateConfig = new HashMap<ConfigResource, Config>();
updateConfig.put(resource, new Config(Collections.singleton(retentionEntry)));
AlterConfigsResult alterConfigsResult = client.alterConfigs(updateConfig);
alterConfigsResult.all();
}

}

我发送了几条消息并计数到5,然后启动控制台消费者

kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic admst-metrics --from-beginning

并且仍然获得本应过期的消息。

kafka日志显示应用了retention.ms配置。我添加了cleanup.policy并将其设置为delete,但这应该没有必要,因为它是默认的。

是什么导致这些邮件被删除?

简短的回答-kafka并不是为了纪念如此低的保留值而设计的。

更长的答案:

Kafka将任何(主题(分区的数据存储在分段文件中。在任何时候,单个段都是"0";活动的";并且在所有较旧的段都是"0"的同时被写入;关闭";。保留/压实仅适用于非活动段。

当log.roll.ms或log.segment.bytes被命中时,Kafka会滚动新的段。默认值(请参阅https://kafka.apache.org/documentation/#brokerconfigs)为7天和/或~1GB。

还有log.segment.delete.delay.ms,默认情况下,这意味着任何片段都会保留至少一分钟。

压缩/删除非活动段的工作由日志清理器线程完成。当找不到工作时,这些人睡眠log.cleaner.backoff.ms(15秒(,并且只检查每个log.retension.check.interval.ms(5分钟(是否可以清洁任何特定的段

所有这些的结果是,默认情况下,任何接近您所需的保留值都是不可能的。

你可以试着调整以上所有的值,看看你能降到多低,但我敢打赌,这对大量的主题来说不会很好。

最新更新