Kafka 1.1版
我们使用Kafka KStream基于事件本身中的选定密钥来聚合事件。以下大致是的作用
KStream[String, Event]
.selectKey[String]({ (_, event) =>
aggregationKey(event)
}
.groupByKey()
.aggregate(
() => {
Event("", "")
},
(k: Any, event: Event, aggregate: Event) => aggregator(k, event, aggregate, inputTopicName),
Materialized
.as[String, Event, KeyValueStore[Bytes, Array[Byte]]]("store-name")
.withValueSerde(protoSerde)
)
.toStream
.to(outTopicName)
在";聚合器";函数I基于某个条件返回null,以便生成tombstone事件。
卡夫卡创造了两个主题,重新划分和变更日志。在重新分区主题中,保留设置为-1。无论墓碑事件如何,这些话题都在不断增长。我找不到清理它们的方法。
我们的要求是直截了当的:
只要满足某个密钥的条件,就不会使用该密钥的聚合更改日志。我们想彻底永久清除该密钥的所有事件
请建议如何清理基于密钥的kstream内部主题?非常感谢。
上游重新分区主题不应无限增长:正如您所注意到的,保留时间设置为-1
(以避免数据丢失(。但是,在处理记录后,KafkaStreams会显式清除该主题。
此外,如果从Aggregator
返回null
,则相应的条目将在KTable
存储中删除,并且tombstone将发送到变更日志主题并发送到下游。当然,tombstone也首先被附加到主题中,并且只有当broker端的主题压缩运行时,旧记录才会被"垃圾收集";。