日志压缩主题不应针对同一密钥保持重复。但在我们的情况下,当发送具有相同密钥的新值时,前一个值不会被删除。可能是什么问题?
val TestCompactState: KTable[String, TestCompact] = builder.table[String, TestCompact](kafkaStreamConfigs.getString("testcompact-source"),
(TestCompactmaterialized).withKeySerde(stringSerde).withValueSerde(TestCompactSerde))
我得到的实际结果
Offsets Keys Messages
5 {"id":5} {"id":5,"namee":"omer","__deleted":"false"}
6 {"id":5} {"id":5,"namee":"d","__deleted":"false"}
我只想要最新的一张唱片对着同一把钥匙预期结果
6 {"id":5} {"id":5,"namee":"d","__deleted":"false"}
这种行为可能有几个原因。压缩清理策略不会在每个传入消息之后运行。取而代之的是代理配置
log.clear.min.compaction.lage.ms:消息在日志中保持未压缩状态的最短时间。仅适用于正在压缩的日志。
类型:长;默认值:0;有效值:;更新模式:集群范围
默认为0
,因此这可能不是原因,但值得检查。
需要注意的是,compact
策略从不压缩当前段。消息只能在非活动段上删除以进行压缩。确保验证
log.segment.bytes:单个日志文件的最大大小
类型:int;默认值:1073741824;有效值:[14,…];更新模式:集群范围
压缩通常由日志的当前("脏"(段中的数据触发。术语";脏的";来自未清理/未压缩。还有另一种配置可以帮助控制压实。
log.cleaner.min.cleanable.tratio:有资格进行清理的日志的脏日志与总日志的最小比率。如果还指定了log.cleaner.max.com.paction.lag.ms或log.clealer.min.com.paction.loag.ms配置,则日志压缩程序认为该日志符合压缩条件,只要:(i(已达到脏比率阈值,并且该日志至少在log.cleaer.min.compaction.lag.ms持续时间内具有脏(未压缩(记录,或者(ii(如果日志最多在log.cleaner.max.compation.lag.ms时段内具有脏(未压缩(记录。
类型:双;默认值:0.5;有效值:;更新模式:集群范围
默认情况下,要压缩的消息的删除滞后非常高,如下配置描述所示。
log.clear.max.compaction.lage.ms:消息在日志中不符合压缩条件的最长时间。仅适用于正在压缩的日志。
类型:长;默认值:9223372036854775807;有效值:;更新模式:集群范围
总之,您观察所描述内容的原因可能有几个。非常重要的一点是要注意,压缩主题并不能为同一密钥提供任何重复消息的保证。它只能保证";至少";保留相同密钥的最新消息。
有一个很好的博客更详细地解释了日志压缩。
据我所知,不可能应用日志压缩策略来保持每个键一条消息。即使设置了cleanup.policy=compact
(主题级别(或log.cleanup.policy=compact
(全局级别(,也不能保证只保留最新的消息,而压缩较旧的消息。
根据卡夫卡官方文件:
日志压缩为我们提供了更细粒度的保留机制保证至少保留每个主键的最后一次更新
分区的活动段从未被压缩,因此在开始删除旧的重复项之前,可能需要一些时间和更多的消息发送到主题。