我试图从kStream计数事件,到时间段:
KStream<String, VehicleEventTO> stream = builder.stream("vehicle", Consumed.with(Serdes.String(), new JsonSerde<>(VehicleEventTO.class)));
KStream<String, VehicleEventTO> streamWithKey = stream.selectKey((key, value) -> value.getId_vehicle().toString());
KStream<String, Long> streamValueKey = streamWithKey.map((key, value) -> KeyValue.pair(key, value.getId_vehicle()));
streamValueKey.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(10).toMillis()))
.count(Materialized.with(Serdes.String(), new JsonSerde<>(Long.class)));
我有这个例外:
线程中的例外 " Test-App-87CE164D-C427-4DCF-AA76-AEEB6F8FC943-StreamThread-1" org.apache.kafka.streams.errors.StreamSexception:捕获的例外 过程。taskID = 0_0,processor = kstream-source-0000000000, 主题=车辆,分区= 0,offset = 160385 org.apache.kafka.streams.processor.internals.streamtask.process(streamtask.java:318( 在 org.apache.kafka.streams.processor.internals.assignedStreamStasks.Process(aensededStreamStasks.java:94( 在 org.apache.kafka.streams.processor.internals.taskmanager.process(taskmanager.java:409( 在 org.apache.kafka.streams.processor.internals.streamthread.processandmaybecommit(streamthread.java:964( 在 org.apache.kafka.streams.processor.internals.streamthread.runonce(streamthread.java:832( 在 org.apache.kafka.streams.processor.internals.streamthread.runloop(streamthread.java:767( 在 org.apache.kafka.streams.processor.internals.streamthread.run(streamThread.java:736( 引起:org.apache.kafka.streams.errors.streamsexception:a 序列化器(键: org.apache.kafka.common.serialization.bytearrayserializer/value: org.apache.kafka.common.serialization.bytearrayserializer(不是 与实际的密钥或值类型兼容(密钥类型:java.lang.string /值类型:java.lang.long(。在 StreamConfig或通过方法参数提供正确的SERD。
groupByKey()
使用默认串行器:
groupByKey()
将记录按当前密钥分组为 kgroupedstream保留原始值和默认值 串行序列和避难所。
您必须使用groupByKey(Serialized<K,V> serialized)
或groupByKey(Grouped<K,V> grouped)
。
以下内容应该解决:
streamValueKey.groupByKey(Serialized.with(Serdes.String(), Serdes.Long()))
.windowedBy(TimeWindows.of(Duration.ofMinutes(10).toMillis()))
.count(Materialized.with(Serdes.String(), new JsonSerde<>(Long.class)));