我试图在聚合后抑制KTable。当时间窗口很大(12、24小时等(并且可以堆积大量数据时,最好将它们存储在不同的主题中。
KTable<Windowed<String>, Object>
是聚合结果的类型。
在聚合调用抑制后立即尝试将导致异常:
.suppress(Suppressed.<Windowed<String>>untilWindowCloses(Suppressed.BufferConfig.unbounded()
.withLoggingEnabled(Map.of()))
.withName("IntermediaryAggregationsStore"))
例外:
Caused by: java.lang.ClassCastException: class org.apache.kafka.streams.kstream.Windowed cannot be cast to class java.lang.String (org.apache.kafka.streams.kstream.Windowed is in unnamed module of loader 'app'; java.lang.String is in module java.base of loader 'bootstrap')
在初始配置中,使用字符串序列号,它必须保持这种状态,直到抑制时刻。
有什么办法处理这个问题吗?
split.branch((key, value) -> true, branchConsumer(s -> s
//.transform(TimestampTransformer::new)
.map((key, someObject) -> new KeyValue<>(someObject.getId(), someObject))
.groupByKey()
.windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofMinutes(1), Duration.ofMinutes(1)))
.aggregate(NewObject::new,
(key, value, aggregate) -> {
// do the mapping between SomeObject and NewObject
);
return aggregate;
}
)
.suppress(Suppressed.<Windowed<String>>untilWindowCloses(Suppressed.BufferConfig.unbounded()
.withLoggingEnabled(Map.of()))
.withName("someName"))
// continue with what's left to do
aggregate
函数接受一个Materialized
类,您可以覆盖传递给抑制的序列化程序。