我正在使用流DSL并执行状态聚合(从主题中读取数据,将数据汇总和将数据写入另一个主题(。如何减少写入州商店的数据的保留期?目前,我的中国团队说数据在州商店保留了5年,我必须减少这一点。我可以设置特定的配置,以保留数据的时间?
KTable<Windowed<String>, JSONObject> kTable = filteredKstream
.groupBy((key, value) -> getNewKey(value),
Grouped.with(Serdes.String(), new JSONObjectSerde()))
.windowedBy(windows).aggregate(() -> {
SampleData sampleData = new SampleData();
return new JSONObject(mapperUtils.writeValueAsString(sampleData, mapper));
} , (key, value, aggregate) -> {
return getAggregateValue(aggregate, value);
} , Materialized
.<String, JSONObject, WindowStore<Bytes, byte[]>> as(
"sample-store")
.withKeySerde(Serdes.String())
.withValueSerde(jsonSerde));
您可以使用Materialized#withRetention()
设置窗口和会话商店的保留期。
https://kafka.apache.org/22/javadoc/org/apache/kafka/kafka/kafka/streams/kstreams/kstream/materialized.html#withretention-java.time.time.time.time.time.time.time.duration-