我正在编写一个Apache Flink流应用程序,该应用程序可以反序列化数据(Avro格式(从Kafka总线读取(更多详细信息在这里(。数据正在反序列化为 Scala 事例类。当我运行该程序时,我收到异常,它收到了来自 Kafka 的第一条消息
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: java.lang.RuntimeException: java.lang.NoSuchMethodException: org.myorg.quickstart.DeviceData.<init>()
at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:625)
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:121)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
at org.myorg.quickstart.StreamingKafkaClient$.main(StreamingKafkaClient.scala:26)
at org.myorg.quickstart.StreamingKafkaClient.main(StreamingKafkaClient.scala)
Caused by: java.lang.RuntimeException: java.lang.NoSuchMethodException: org.myorg.quickstart.DeviceData.<init>()
at org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:353)
at org.apache.avro.specific.SpecificData.newRecord(SpecificData.java:369)
at org.apache.avro.reflect.ReflectData.newRecord(ReflectData.java:901)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:212)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
at org.myorg.quickstart.AvroDeserializationSchema.deserialize(AvroDeserializationSchema.scala:20)
at org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper.deserialize(KeyedDeserializationSchemaWrapper.java:44)
at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:142)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:738)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NoSuchMethodException: org.myorg.quickstart.DeviceData.<init>()
at java.lang.Class.getConstructor0(Class.java:3082)
at java.lang.Class.getDeclaredConstructor(Class.java:2178)
at org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:347)
... 16 more
Process finished with exit code 1
Scala 案例类非常简单:
package org.myorg.quickstart
/** Case class to hold the Device data. */
case class DeviceData(deviceId: String,
sw_version: String,
timestamp: String,
reading: Double
)
不确定为什么案例类需要"init"方法。如何做到这一点的示例?我应该使用案例类以外的其他数据结构吗?
Avro 序列化程序或更具体地说,SpecificData
要求目标类型具有默认构造函数(没有参数的构造函数(。否则,Avro 无法实例化目标类型的对象。
尝试通过以下方式添加默认构造函数
case class DeviceData(
deviceId: String,
sw_version: String,
timestamp: String,
reading: Double) {
def this() = this("default", "default", "default", 0)
}