我如何设置卡夫卡流创建的州商店的保留期



我正在使用流DSL并执行状态聚合(从主题中读取数据,将数据汇总和将数据写入另一个主题(。如何减少写入州商店的数据的保留期?目前,我的中国团队说数据在州商店保留了5年,我必须减少这一点。我可以设置特定的配置,以保留数据的时间?

    KTable<Windowed<String>, JSONObject> kTable = filteredKstream
            .groupBy((key, value) -> getNewKey(value),
                    Grouped.with(Serdes.String(), new JSONObjectSerde()))
            .windowedBy(windows).aggregate(() -> {
                SampleData sampleData = new SampleData();
                return new JSONObject(mapperUtils.writeValueAsString(sampleData, mapper));
            } , (key, value, aggregate) -> {
                return getAggregateValue(aggregate, value);
            } , Materialized
                    .<String, JSONObject, WindowStore<Bytes, byte[]>> as(
                            "sample-store")
                    .withKeySerde(Serdes.String())
                    .withValueSerde(jsonSerde));

您可以使用Materialized#withRetention()设置窗口和会话商店的保留期。

https://kafka.apache.org/22/javadoc/org/apache/kafka/kafka/kafka/streams/kstreams/kstream/materialized.html#withretention-java.time.time.time.time.time.time.time.duration-

相关内容

  • 没有找到相关文章

最新更新