Kafka 流状态存储 rocksdb 文件大小不会在手动删除消息时减小



>我正在使用处理器 api 从状态存储中删除消息。删除工作成功,我通过使用交互式查询调用 kafka 键对状态存储进行确认,但它不会减少目录 tmp/kafka-streams 下本地磁盘上的 kafka 流文件大小。

@Override
public void init(ProcessorContext processorContext) {
this.processorContext = processorContext;
processorContext.schedule(Duration.ofSeconds(10), PunctuationType.STREAM_TIME, new Punctuator() {
@Override
public void punctuate(long l) {
processorContext.commit();
}
}); //invoke punctuate every 12 seconds
this.statestore = (KeyValueStore<String, GenericRecord>) processorContext.getStateStore(StateStoreEnum.HEADER.getStateStore());
log.info("Processor initialized");
}
@Override
public void process(String key, GenericRecord value) {
statestore.all().forEachRemaining(keyValue -> {
statestore.delete(keyValue.key);
});
}

Kafka 流目录大小

2.3M    /private/tmp/kafka-streams
3.3M    /private/tmp/kafka-streams

我是否需要任何特定配置才能控制文件大小?如果它不能以这种方式工作,是否可以删除 kafka-streams 目录?我认为它应该是安全的,因为这种删除将从状态存储和更改日志主题中删除记录。

RocksDB 在后台进行文件压缩。因此,如果您需要更积极的压缩,则应通过 Streams 配置参数rocksdb.config.setter传入自定义RocksDBConfigSetter。有关 RockDB 的更多详细信息,请查看 RocksDB 文档。

https://docs.confluent.io/current/streams/developer-guide/config-streams.html#rocksdb-config-setter

但是,只要没有真正的问题,我不建议更改 RocksDB 配置——你可以弊大于利。看来你的商店规模很小,因此,我没有看到真正的问题ATM。

顺便说一句:如果你进入生产环境,你应该将state.dir配置更改为适当的目录,即使在重新启动机器后,状态也不会丢失。如果将状态放入默认/tmp位置,则重新启动计算机后状态很可能消失,并且会触发从更改日志主题进行昂贵的恢复。

最新更新