禁止KTable聚合到中间主题



我试图在聚合后抑制KTable。当时间窗口很大(12、24小时等(并且可以堆积大量数据时,最好将它们存储在不同的主题中。

KTable<Windowed<String>, Object>是聚合结果的类型。

在聚合调用抑制后立即尝试将导致异常:

.suppress(Suppressed.<Windowed<String>>untilWindowCloses(Suppressed.BufferConfig.unbounded()
.withLoggingEnabled(Map.of())) 
.withName("IntermediaryAggregationsStore"))

例外:

Caused by: java.lang.ClassCastException: class org.apache.kafka.streams.kstream.Windowed cannot be cast to class java.lang.String (org.apache.kafka.streams.kstream.Windowed is in unnamed module of loader 'app'; java.lang.String is in module java.base of loader 'bootstrap')

在初始配置中,使用字符串序列号,它必须保持这种状态,直到抑制时刻。

有什么办法处理这个问题吗?

split.branch((key, value) -> true, branchConsumer(s -> s
//.transform(TimestampTransformer::new)
.map((key, someObject) -> new KeyValue<>(someObject.getId(), someObject))
.groupByKey()
.windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofMinutes(1), Duration.ofMinutes(1)))
.aggregate(NewObject::new,
(key, value, aggregate) -> {
// do the mapping between SomeObject and NewObject
);
return aggregate;
}
)
.suppress(Suppressed.<Windowed<String>>untilWindowCloses(Suppressed.BufferConfig.unbounded()
.withLoggingEnabled(Map.of()))
.withName("someName"))
// continue with what's left to do

aggregate函数接受一个Materialized类,您可以覆盖传递给抑制的序列化程序。

相关内容

  • 没有找到相关文章

最新更新