Kafka Streams:获取时间窗口中的事件计数



我的数据流为<字符串,字符串>事件。我想获得10分钟时间窗口内的事件计数,并输出到另一个主题。以下是我的代码

StreamsBuilder streamsBuilder = new StreamsBuilder();
streamsBuilder.stream("events")
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMillis(10000)))
.count()
.toStream()
.to("output");

但是我得到错误

ClassCastException while producing data to topic output. A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: org.apache.kafka.common.serialization.LongSerializer) is not compatible to the actual key or value type (key type: org.apache.kafka.streams.kstream.Windowed / value type: java.lang.Long). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters (for example if using the DSL, `#to(String topic, Produced<K, V> produced)` with `Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).

windowedBycount()的结果是类型为<Windowed<String>, Long>的键值对,因此需要通过Produced参数在to()中设置不同的序列号。默认情况下,将使用配置中似乎设置为StringSerde/StringSerde的序列号,这些序列号显然与输出主题键/值类型不匹配。

Kafka Streams附带了用于窗口类型的内置serdes,您可以通过Serdes工厂类获得这些serdes。

相关内容

  • 没有找到相关文章

最新更新