我们有一个 Kafka 流聚合拓扑。我们需要控制 changeLog 主题的大小,以降低 Kafka 存储成本。因此,我们在拓扑中使用转换器 (DSL API) 来调度一个标点符号,该标点符号使用 keyValueStore.delete() 从状态存储中删除旧记录。
我能够验证删除后,在标点符号的进一步计划触发器上,已删除的密钥不在状态存储中。但是它是否也从更改日志主题中删除了记录?更重要的是,它是否也减小了 changeLog 主题的大小,从而控制了 Kafka 存储成本?
是的,对状态存储的更改将应用于更改日志主题。
否,发出"删除"命令时,不会将实际的记录删除到changelog
主题中。请注意,"delete"命令实际上是一条记录,其null
值(又名tombstone
)写入主题(changelog
或任何其他) - 请参阅此处:
值以特殊方式解释:具有空值的记录 值表示记录键的"DELETE"或逻辑删除
所以,事实上,这种解释是让它感觉像是删除的解释;人们可以将changelog
主题(你必须知道确切的主题名称)作为KStream或使用Kafka Consumer API来阅读,并在那里找到tombstone
记录(直到被压缩或保留线程删除)。但是,如果您使用 KTable 阅读changelog
或任何压缩主题,则tombstone
记录将确定从关联的存储中删除 - 您将不再在存储中找到相关键,尽管它实际上存在于相关的压缩主题中。
如果在主题上启用了压缩策略(默认情况下在changelog
主题上启用),则会删除其记录,直到特定键的最后一个记录。因此,在某些时候,您只会拥有删除记录,因为压缩 Kafka 线程删除了具有相同键的先前记录。