我正试图重现这个例子。我的拓扑结构是:
@Bean("myTopo")
public KStream<Object, Object> getTopo(@Qualifier("myKConfig") StreamsBuilder builder) {
var stream = builder.stream("my-events");
stream.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(2)))
.count()
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream()
.foreach((k, v) -> {
System.out.println("k + v = " + k + " --- " + v);
});
我已经在配置中设置了serde和窗口化的serde内部类:
...
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class);
...
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, JsonNode.class);
props.put(StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS, JsonSerde.class);
var config = new KafkaStreamsConfiguration(props);
return new StreamsBuilderFactoryBean(config);
我得到的错误是
org.apache.kafka.streams.errors.StreamsException: ClassCastException invoking Processor.
Do the Processor's input types match the deserialized types?
Check the Serde setup and change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
Make sure the Processor can accept the deserialized input of type key:
org.apache.kafka.streams.kstream.Windowed,
and value:
org.apache.kafka.streams.kstream.internals.Change.
潜在原因
java.lang.ClassCastException: class org.apache.kafka.streams.kstream.Windowed
cannot be cast to class java.lang.String (org.apache.kafka.streams.kstream.Windowed is in unnamed module of loader 'app';
java.lang.String is in module java.base of loader 'bootstrap')
我看到count()
返回KTable<Windowed<Object>, Long>
。所以看起来问题是它想要一个Windowed<String>
序列作为密钥。显然,DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS
是不够的。
如何创建和设置?
我想我遇到了这个错误:
https://issues.apache.org/jira/browse/KAFKA-9259
我在count()
方法中添加了一个Materialized
var store = Stores.persistentTimestampedWindowStore(
"some-state-store",
Duration.ofMinutes(5),
Duration.ofMinutes(2),
false);
var materialized = Materialized
.<String, Long>as(store)
.withKeySerde(Serdes.String());
现在,代码毫无例外地运行。