Kafka Streams 示例抛出一个类强制转换异常,serde windowed -> string。如何设置正确的Serde?



我正试图重现这个例子。我的拓扑结构是:

@Bean("myTopo")
public KStream<Object, Object> getTopo(@Qualifier("myKConfig") StreamsBuilder builder) {
var stream = builder.stream("my-events");
stream.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(2)))
.count()
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream()
.foreach((k, v) -> {
System.out.println("k + v = " + k + " --- " + v);
});

我已经在配置中设置了serde和窗口化的serde内部类:

...
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class);
...
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, JsonNode.class);
props.put(StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS, JsonSerde.class);
var config = new KafkaStreamsConfiguration(props);
return new StreamsBuilderFactoryBean(config);

我得到的错误是

org.apache.kafka.streams.errors.StreamsException: ClassCastException invoking Processor. 
Do the Processor's input types match the deserialized types? 
Check the Serde setup and change the default Serdes in StreamConfig or provide correct Serdes via method parameters. 
Make sure the Processor can accept the deserialized input of type key: 
org.apache.kafka.streams.kstream.Windowed, 
and value: 
org.apache.kafka.streams.kstream.internals.Change.

潜在原因

java.lang.ClassCastException: class org.apache.kafka.streams.kstream.Windowed 
cannot be cast to class java.lang.String (org.apache.kafka.streams.kstream.Windowed is in unnamed module of loader 'app'; 
java.lang.String is in module java.base of loader 'bootstrap')

我看到count()返回KTable<Windowed<Object>, Long>。所以看起来问题是它想要一个Windowed<String>序列作为密钥。显然,DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS是不够的。

如何创建和设置?

我想我遇到了这个错误:

https://issues.apache.org/jira/browse/KAFKA-9259

我在count()方法中添加了一个Materialized

var store = Stores.persistentTimestampedWindowStore(
"some-state-store",
Duration.ofMinutes(5),
Duration.ofMinutes(2),
false);
var materialized = Materialized
.<String, Long>as(store)
.withKeySerde(Serdes.String());

现在,代码毫无例外地运行。

相关内容

  • 没有找到相关文章

最新更新