我在Scala中使用Apache Flink(v1.11(,并为Kafka连接器添加了自己的DeserializationSchema。因此,我想使用我自己的包和版本的jackson(v2.12.0(。
但我得到了以下错误:
Exception in thread "main" java.lang.VerifyError: Cannot inherit from final class
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at com.fasterxml.jackson.dataformat.csv.CsvMapper.<init>(CsvMapper.java:108)
at de.integration_factory.datastream.types.CovidEventSchema.<init>(CovidEventSchema.scala:14)
at de.integration_factory.datastream.Aggregate_Datastream$.main(Aggregate_Datastream.scala:34)
at de.integration_factory.datastream.Aggregate_Datastream.main(Aggregate_Datastream.scala)
这是我的EventSchema:
import com.fasterxml.jackson.dataformat.csv.CsvMapper
import com.fasterxml.jackson.datatype.joda.JodaModule
import org.apache.flink.api.common.serialization.{DeserializationSchema, SerializationSchema}
import org.apache.flink.api.common.typeinfo.TypeInformation
@SerialVersionUID(6154188370181669758L)
class CovidEventSchema extends DeserializationSchema[CovidEvent] with SerializationSchema[CovidEvent] {
private val mapper = new CsvMapper
mapper.registerModule(new JodaModule)
val csvSchema = mapper
.schemaFor(classOf[CovidEvent])
.withLineSeparator(",")
.withoutHeader()
val reader = mapper.readerWithSchemaFor(classOf[CovidEvent])
def serialize(event: CovidEvent): Array[Byte] = mapper.writer(csvSchema).writeValueAsBytes()
@throws[IOException]
def deserialize(message: Array[Byte]): CovidEvent = reader.readValue[CovidEvent](message)
def isEndOfStream(nextElement: CovidEvent) = false
def getProducedType: TypeInformation[CovidEvent] = TypeInformation.of(classOf[CovidEvent])
}
这是我的PoJo模式:
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.joda.time.DateTime;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class CovidEvent {
private long objectId;
private int bundeslandId;
private String bundesland;
private String landkreis;
private String altersgruppe;
private String geschlecht;
private int anzahlFall;
private int anzahlTodesfall;
@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd HH:mm:ss", timezone = "UTC")
private DateTime meldedatum;
private int landkreisId;
private String datenstand;
private int neuerFall;
private int neuerTodesfall;
private String refDatum;
private int neuGenesen;
private int anzahlGenesen;
@JsonFormat(shape = JsonFormat.Shape.NUMBER)
private boolean istErkrankungsbeginn;
private String altersGruppe2;
public long getEventtime() {
return meldedatum.getMillis();
}
}
经过一些研究,我发现这个错误可能是由类路径中不同的Jackson版本引起的。
我认为使用自己版本的杰克逊是可能的,因为Flink对自己的版本进行了着色。
我做错了什么?
更新:如果我从着色的flink包导入jackson类,它正在中工作
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper
但是,我还是依赖于那个有着冷酷阴影的杰克逊版本。
UPDATE:那么使用open
的更好实现是这样的吗?
class CovidEventSchema extends DeserializationSchema[CovidEvent] with SerializationSchema[CovidEvent] {
private var reader: ObjectReader = null
private var writer: ObjectWriter = null
override def open(context: SerializationSchema.InitializationContext): Unit = {
val mapper = new CsvMapper()
val csvSchema = mapper
.schemaFor(classOf[CovidEvent])
.withLineSeparator(",")
.withoutHeader()
this.reader = mapper.readerFor(classOf[CovidEvent]).`with`(csvSchema)
this.writer = mapper.writer(csvSchema)
super.open(context)
}
}
如果使用Flink的类加载器,它就会工作。然而,按照设置的工作方式,您只是在创建整个DataStream应用程序的同时,将用户代码加载到系统类加载器中。我不会详细介绍更多细节(除非后续要求(,并寻求解决方案:
在创建过程中,您的DeserializationSchema不应初始化繁重的资源(这种情况发生在客户端或作业管理器端(,而应仅在open
中初始化(这种情况出现在任务管理器上(。所以请移动
private val mapper = new CsvMapper
mapper.registerModule(new JodaModule)
转换为CCD_ 3。
它只适用于捆绑版本,因为幸运的是ObjectMapper implements Serializable
,但对于解析器来说,这种情况很少发生,而且如果反序列化程序初始化正确,实际上完全没有必要。