apache flink with Kafka: InvalidTypesException



我有以下代码:

Properties properties = new Properties();
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MyCustomClassDeserializer.class.getName());
FlinkKafkaConsumer<MyCustomClass> kafkaConsumer = new FlinkKafkaConsumer(
"test-kafka-topic",
new SimpleStringSchema(),
properties);
final StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<MyCustomClass> kafkaInputStream = streamEnv.addSource(kafkaConsumer);
DataStream<String> stringStream = kafkaInputStream
.map(new MapFunction<MyCustomClass,String>() {
@Override
public String map(MyCustomClass message) {
logger.info("--- Received message : " + message.toString());
return message.toString();
}
});
streamEnv.execute("Published messages");

MyCustomClassDeserializer用于将字节数组转换为java对象。

当我在本地运行这个程序时,我会得到错误:

导致原因:org.apache.flink.api.common.functions.InvalidTypesException:输入不匹配:应为基本类型。

我得到这个代码行:

.map(new MapFunction<MyCustomClass,String>() {

不知道我为什么得到这个?

因此,您有一个返回POJO的反序列化程序,但您告诉Flink,它应该使用SimpleStringSchema将记录从byte[]反序列化为String。现在看到问题了吗?:(

我认为您一般不应该在FlinkKafkaConsumer中使用自定义的Kafka反序列化程序。相反,您应该创建一个从Flink扩展DeserializationSchema的自定义类。它在类型安全性和可测试性方面应该要好得多。

相关内容

  • 没有找到相关文章

最新更新