用于业务逻辑类的Apache Flink NoSerialization



我将Apache Flink与Kaffa一起使用,在那里我可以正确地使用主题中的消息并对其进行反序列化。

我使用flink数据流映射函数来处理收到的kafka消息,但flink希望我的所有业务对象都是可序列化的。

下面是伪代码

TestRequestListener listener = this.context.getBean(TestRequestListener.class);
KafkaConsumer<KafkaTestBatch> kafkaConsumer = KafkaSource<>.builder()...build();
StreamExecutionEnviroment env = ...
DataStream<KafkaTestBatch> kafkaSource = env.fromSource(kafkaConsumer,...);
kafkaSource.map(kafkaTestBatch -> {
listener.listen(kafkaTestBatch);
});

这里,TestRequestListener具有业务逻辑,它甚至可以在其中查询数据库。

Flink希望TestRequestListener及其所有依赖项都是可序列化的。我的用例正确吗?我使用的燧石正确吗?

如果使用RichMapFunction,则可以在其open方法中实例化侦听器;那么你就不必担心它的序列化了

不过,您应该小心,不要在Flink函数中阻塞i/o。最好使用Flink的异步i/o操作符。对于查询数据库,最好将外部DB作为表查找源进行访问,或者将其数据作为CDC源进行流式传输。

最新更新