Custom Class
人
class Person
{
private Integer id;
private String name;
//getters and setters
}
卡夫卡Flink连接器
TypeInformation<Person> info = TypeInformation.of(Person.class);
TypeInformationSerializationSchema schema = new TypeInformationSerializationSchema(info, new ExecutionConfig());
DataStream<Person> input = env.addSource( new FlinkKafkaConsumer08<>("persons", schema , getKafkaProperties()));
现在,如果我发送以下json
{ "id" : 1, "name": Synd }
通过 Kafka 控制台生产者,flink 代码抛出空指针异常但是,如果我使用SimpleStringSchema
而不是之前定义的自定义架构,则会打印流。
上述设置有什么问题
TypeInformationSerializationSchema
是一个反/序列化模式,它使用 Flink 的序列化堆栈,因此也使用它的序列化程序。因此,当使用此SerializationSchema
Flink 期望数据已使用 Flink 的序列化程序对Person
类型进行序列化。
鉴于Person
类的摘录,Flink 很可能会使用它的PojoTypeSerializer
。此序列化程序无法理解馈送 JSON 输入数据。
如果要使用 JSON 作为输入格式,则必须定义自己的DeserializationSchema
,该可以将 JSON 解析为Person
。
回答有相同问题的人
自定义序列化程序
class PersonSchema implements DeserializationSchema<Person>{
private ObjectMapper mapper = new ObjectMapper(); //com.fasterxml.jackson.databind.ObjectMapper;
@Override
public Person deserialize(byte[] bytes) throws IOException {
return mapper.readValue( bytes, Person.class );
}
@Override
public boolean isEndOfStream(Person person) {
return false;
}
@Override
public TypeInformation<Person> getProducedType() {
return TypeInformation.of(new TypeHint<Person>(){});
}
}
使用架构
DataStream<Person> input = env.addSource( new FlinkKafkaConsumer08<>("persons", new PersonSchema() , getKafkaProperties()));