"WindowedBy Count KStream"抛出流异常



我试图从kStream计数事件,到时间段:

    KStream<String, VehicleEventTO> stream = builder.stream("vehicle", Consumed.with(Serdes.String(), new JsonSerde<>(VehicleEventTO.class)));
    KStream<String, VehicleEventTO> streamWithKey = stream.selectKey((key, value) -> value.getId_vehicle().toString());
    KStream<String, Long> streamValueKey = streamWithKey.map((key, value) -> KeyValue.pair(key, value.getId_vehicle()));
    streamValueKey.groupByKey()
                  .windowedBy(TimeWindows.of(Duration.ofMinutes(10).toMillis()))
                  .count(Materialized.with(Serdes.String(), new JsonSerde<>(Long.class)));

我有这个例外:

线程中的例外 " Test-App-87CE164D-C427-4DCF-AA76-AEEB6F8FC943-StreamThread-1" org.apache.kafka.streams.errors.StreamSexception:捕获的例外 过程。taskID = 0_0,processor = kstream-source-0000000000, 主题=车辆,分区= 0,offset = 160385 org.apache.kafka.streams.processor.internals.streamtask.process(streamtask.java:318( 在 org.apache.kafka.streams.processor.internals.assignedStreamStasks.Process(aensededStreamStasks.java:94( 在 org.apache.kafka.streams.processor.internals.taskmanager.process(taskmanager.java:409( 在 org.apache.kafka.streams.processor.internals.streamthread.processandmaybecommit(streamthread.java:964( 在 org.apache.kafka.streams.processor.internals.streamthread.runonce(streamthread.java:832( 在 org.apache.kafka.streams.processor.internals.streamthread.runloop(streamthread.java:767( 在 org.apache.kafka.streams.processor.internals.streamthread.run(streamThread.java:736( 引起:org.apache.kafka.streams.errors.streamsexception:a 序列化器(键: org.apache.kafka.common.serialization.bytearrayserializer/value: org.apache.kafka.common.serialization.bytearrayserializer(不是 与实际的密钥或值类型兼容(密钥类型:java.lang.string /值类型:java.lang.long(。在 StreamConfig或通过方法参数提供正确的SERD。

groupByKey()使用默认串行器:

groupByKey()

将记录按当前密钥分组为 kgroupedstream保留原始值和默认值 串行序列和避难所。

您必须使用groupByKey(Serialized<K,V> serialized)groupByKey(Grouped<K,V> grouped)

以下内容应该解决:

streamValueKey.groupByKey(Serialized.with(Serdes.String(), Serdes.Long()))
              .windowedBy(TimeWindows.of(Duration.ofMinutes(10).toMillis()))
              .count(Materialized.with(Serdes.String(), new JsonSerde<>(Long.class)));

相关内容

  • 没有找到相关文章

最新更新