Kafka 流在分组和聚合时使用 KTable 投射到字符串问题



我有一个 Kafka 流,其中包含传入的消息,看起来像sensor_code: x, time: 1526978768, address: Y我想创建一个 KTable,用于存储每个传感器代码的每个唯一地址。

可 KTable

KTable<String, Long> numCount = streams
.map(kvm1)
.groupByKey(Serialized.with(stringSerde, stringSerde))
.count()
.groupBy(kvm2, Serialized.with(stringSerde, longSerde))
.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("StateStore"));

kvm1kvm2的地方是我自己的KeyValueMappers.我的想法是用sensor_code=x, address=y替换现有的密钥,执行groupByKey()count()。然后是另一个groupBy(kvm2, Serialized.with(stringSerde, longSerde)),其中kvm2修改现有key以包含sensor_code,然后值将是其计数。但是既然它不起作用,也许我做错了...... 它尝试将其转换为 Long 并引发异常,因为它正在寻找字符串。我要伯爵Long,对吧?

这是我使用的第一个KeyValueMapper及其各自的帮助功能:

private static String getKeySensorIdAddress(String o) {
String x = "sensor_id="x", address="y""; 
try {
WifiStringEvent event = mapper.readValue(o, WifiStringEvent.class);
x = x.replace("x", event.getSensor_code());
x = x.replace("y", event.getAddress());
return x;
} catch(Exception ex) {
System.out.println("Error... " + ex);
return "Error";
}
}
//KeyValueMapper1
KeyValueMapper<String, String, KeyValue<String, String>> kvm1 = 
new KeyValueMapper<String, String, KeyValue<String, String>>() {
public KeyValue<String, String> apply(String key, String value) {
return new KeyValue<>(getKeySensorIdAddress(value), value);
}
};

这是第二个KeyValueMapper及其帮助功能。

private static String getKeySensorId(String o) {
int a = o.indexOf(",");
return o.substring(0,a);
}
//KeyValueMapper2 
KeyValueMapper<String, Long, KeyValue<String, Long>> kvm2 = 
new KeyValueMapper<String, Long, KeyValue<String, Long>>() {
public KeyValue<String, Long> apply(String key, Long value) {
return new KeyValue<>(getKeySensorId(key), value);
}
};

这是我尝试运行代码时返回的异常和错误。

[2018-05-29 15:28:40,119] 错误流线程 [testUniqueAddresses-ed48daf8-fff0-42e4-bb5a-687584734b45-StreamThread-1] 由于以下错误,无法处理流任务 2_0:(org.apache.kafka.streams.processor.internals.AssignedStreamsTasks:105( java.lang.ClassCastException: java.lang.Long 不能强制转换为 java.lang.String at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28( at org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:178( at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore$1.innerValue(MeteredKeyValueBytesStore.java:66( at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore$1.innerValue(MeteredKeyValueBytesStore.java:57( at org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.put(InnerMeteredKeyValueStore.java:198( at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.put(MeteredKeyValueBytesStore.java:117( at org.apache.kafka.streams.kstream.internals.KTableAggregate$KTableAggregateProcessor.process(KTableAggregate.java:95( at org.apache.kafka.streams.kstream.internals.KTableAggregate$KTableAggregateProcessor.process(KTableAggregate.java:56(

请注意java.lang.ClassCastException: java.lang.Long cannot be cast to java.lang.String错误。

任何想法为什么我会收到此错误以及如何修复它或建议如何编辑代码以达到我提到的所需输出?

提前非常感谢!

编辑:对我的问题进行了重大改革,因为我放弃了其中一种方法。

在第一种情况下,如果你想使用HashMap作为值类型,你需要为它定义一个自定义serde,并使用Materialized.withValueSerde传递它。

在第二种情况下,如果没有看到 KeyValueMappers 的返回类型和确切的错误消息,我就不能说:它是否试图将字符串转换为长整型,反之亦然?

编辑:感谢您分享额外的信息。

我认为在第二种情况下,您需要在第二个计数操作中指定值 serde。KGroupedStream 和 KGroupedTable 上的 count(( 之间似乎存在不一致,因为前者会自动将值 serde 设置为 LongSerde:

https://github.com/apache/kafka/blob/1.1/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java#L281-L283

但 KGroupedTable 没有:

https://github.com/apache/kafka/blob/1.1/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java#L253

它似乎已经在后备箱上修复,但尚未发布:

https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java#L158-L160

最新更新