我们有一个Kafka实例,每天约有50m记录,每天约有100k输入,因此在Kafka-World中没什么疯狂的。当我们想使用我们更复杂的流应用程序之一(具有许多不同的聚合步骤)重新处理这些记录时,磁盘用法从重新分配主题中变得非常疯狂。从我们所理解的情况下,Theese主题在Kafka-streams 1.0.1中使用标准保留时间(14天?),在2.1.1中使用了Long.max。这是非常不便的,因为对于回音主题,在我们的情况下,每条记录仅在完成聚合时才读取一次,然后才能删除。
因此,我们的问题是是否有任何方法可以在Kafka-streams中配置设置,以清除记录后处理?我已经看到有某种方法可以使用aupgedatabefore()(https://issues.apache.org/jira/browse/kafka-4586)。
供参考,应用程序的一部分中的一些尺寸:
table-1 (changElog,compact〜2GB) ->更改键和聚合(retartition〜14GB) -> table-table-2 (ChangElog,delete,delete,delete,14KB) ->更改键和聚合(重新分配21GB) -> table-3 (changelog,compact,0.5GB)
(这是我的第一个堆栈溢出问题
kafka streams使用purgeDataBefore()
API,因为1.1
版本:https://issues.apache.org/jira/jira/browse/browse/kafka-6150
您不需要启用它(也不能禁用它)。