我在 flink 流作业中看到一种奇怪的行为。这是我的代码
streamExecutionEnvironment.enableCheckpointing(checkPointInterval, CheckpointingMode.EXACTLY_ONCE);
streamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
ExecutionConfig executionConfig = streamExecutionEnvironment.getConfig();
executionConfig.disableForceKryo();
executionConfig.enableForceAvro();
Path path = new Path(outputPath);
CheckpointConfig config = streamExecutionEnvironment.getCheckpointConfig();
config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
String mutateConfig = IOUtils.toString(EventProcessor.class.getClassLoader().getResourceAsStream(configFile));
FlinkKafkaConsumer flinkKafkaConsumer = new FlinkKafkaConsumer(topics,
new KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
properties);
flinkKafkaConsumer.setCommitOffsetsOnCheckpoints(true);
DataStream<GenericRecord> dataStream = streamExecutionEnvironment.addSource(flinkKafkaConsumer).name("booking_flow_source");
DataStream<GenericRecord> enrichDataStream = dataStream.map(new MapFunction<GenericRecord, GenericRecord>() {
private transient Mutator mutator;
@Override
public GenericRecord map(GenericRecord record) {
GenericRecord mutateRecord=record;
try {
mutator = new Mutator(mutateConfig);
mutateRecord = mutator.mutate(record);
} catch (Exception e) {
e.printStackTrace();
}
return mutateRecord;
}
});
enrichDataStream.print();
到目前为止,此代码工作正常。现在我需要从我的 avro 模式生成 java 类,所以我包含了这个 avro 依赖项。
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.9.1</version>
</dependency>
在我的pom中包含此内容后,我的代码停止工作,并且出现异常:
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
Serialization trace:
props (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
schema (org.apache.avro.generic.GenericData$Record)
即使我在我的代码中禁用 kryo 并强制 avro,我仍然得到相同的异常。 如果我删除此依赖项,则代码将正常工作并且我的流将被打印。
所以我无法通过添加 avro 依赖项来理解正在更改的内容。
请帮忙
我也有类似的问题。我修复了它设置类加载器.解析顺序:flink配置中的父优先。
avro 的版本似乎太高了,你可以试试 avro 1.8.2 而不是 1.9.1