Flink Kafka - 自定义类数据始终为空



Custom Class

class Person
{
private Integer id;
private String name; 
//getters and setters
}

夫卡Flink连接器

TypeInformation<Person> info = TypeInformation.of(Person.class);
TypeInformationSerializationSchema schema = new TypeInformationSerializationSchema(info, new ExecutionConfig());
DataStream<Person> input = env.addSource( new FlinkKafkaConsumer08<>("persons", schema , getKafkaProperties()));

现在,如果我发送以下json

{ "id" : 1, "name": Synd }

通过 Kafka 控制台生产者,flink 代码抛出空指针异常但是,如果我使用SimpleStringSchema而不是之前定义的自定义架构,则会打印流。

上述设置有什么问题

TypeInformationSerializationSchema是一个反/序列化模式,它使用 Flink 的序列化堆栈,因此也使用它的序列化程序。因此,当使用此SerializationSchemaFlink 期望数据已使用 Flink 的序列化程序对Person类型进行序列化。

鉴于Person类的摘录,Flink 很可能会使用它的PojoTypeSerializer。此序列化程序无法理解馈送 JSON 输入数据。

如果要使用 JSON 作为输入格式,则必须定义自己的DeserializationSchema,该可以将 JSON 解析为Person

回答有相同问题的人

自定义序列化程序

class PersonSchema implements DeserializationSchema<Person>{
private ObjectMapper mapper = new ObjectMapper(); //com.fasterxml.jackson.databind.ObjectMapper;
@Override
public Person deserialize(byte[] bytes) throws IOException {
return mapper.readValue( bytes, Person.class );
}
@Override
public boolean isEndOfStream(Person person) {
return false;
}
@Override
public TypeInformation<Person> getProducedType() {
return TypeInformation.of(new TypeHint<Person>(){});
}
}

使用架构

DataStream<Person> input = env.addSource( new FlinkKafkaConsumer08<>("persons", new PersonSchema() , getKafkaProperties()));

相关内容

  • 没有找到相关文章

最新更新