Kafka Streams KeyValueStore retention.bytes



我对KeyValueStore有一个有趣的行为,我有一些假设来解释它,也许你可以说我是对还是错......

我配置了一个状态存储,如下所示

Map<String, String> storeConfig = new HashMap<>();
storeConfig.put(TopicConfig.RETENTION_MS_CONFIG, TimeUnit.DAYS.toMillis(30));
storeConfig.put(TopicConfig.CLEANUP_POLICY_CONFIG, "compact,delete");
StoreBuilder store1 = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("STORE1"),
Serdes.String(),
Serdes.String()
);
streamsBuilder.addStateStore(store1.withLoggingEnabled(storeConfig));

使用此配置,我预计比 30 天更早的数据集会消失,但我观察到的东西完全不同。

当我查看商店的 rockdb 目录时,每 14451 字节它就会滚动文件,并且我在目录中有这样的结构

14451  1. Oct 19:00 LOG
14181 30. Sep 15:59 LOG.old.1569854012833395
14451 30. Sep 17:40 LOG.old.1569918431235734
14451  1. Oct 11:05 LOG.old.1569949239434224

似乎不是实现配置的 30 天保留期,而是实现了文件大小。

我在互联网上发现还有参数Topic.RETENTION_BYTES_CONFIG"retention.bytes",我是否还必须配置此参数,因此我的数据在保留期间可见,并且不会因为文件大小而被删除(我知道我的密钥有价值,但在发生这种现象后我无法访问它(...

谢谢你的答案..

在内部,KeyValueStores使用RocksDB,而RocksDB在内部使用所谓的LSM-Tree(Log-Structured-Merged-Tree(,它创建了许多较小的段,这些段后来组合成较大的段。在此"压缩"步骤之后,可以删除较小的段文件,因为数据将复制到较大的段文件中。因此,没有什么可担心的。

此外,Topic.RETENTION_MS_CONFIG是一个主题配置,与 Kafka Streams 应用程序的本地存储无关。此外,KeyValueStore将永久保留数据,直到通过"逻辑删除"消息明确删除。因此,如果为基础更改日志主题设置保留时间,则可能会在主题中删除数据,但不会在本地存储中删除数据。

如果要将保留时间应用于本地存储,则不能使用KevValueStore,但可以使用支持保留时间的WindowedStore

相关内容

最新更新