我将Apache Flink与Kaffa一起使用,在那里我可以正确地使用主题中的消息并对其进行反序列化。
我使用flink数据流映射函数来处理收到的kafka消息,但flink希望我的所有业务对象都是可序列化的。
下面是伪代码
TestRequestListener listener = this.context.getBean(TestRequestListener.class);
KafkaConsumer<KafkaTestBatch> kafkaConsumer = KafkaSource<>.builder()...build();
StreamExecutionEnviroment env = ...
DataStream<KafkaTestBatch> kafkaSource = env.fromSource(kafkaConsumer,...);
kafkaSource.map(kafkaTestBatch -> {
listener.listen(kafkaTestBatch);
});
这里,TestRequestListener具有业务逻辑,它甚至可以在其中查询数据库。
Flink希望TestRequestListener及其所有依赖项都是可序列化的。我的用例正确吗?我使用的燧石正确吗?
如果使用RichMapFunction
,则可以在其open
方法中实例化侦听器;那么你就不必担心它的序列化了
不过,您应该小心,不要在Flink函数中阻塞i/o。最好使用Flink的异步i/o操作符。对于查询数据库,最好将外部DB作为表查找源进行访问,或者将其数据作为CDC源进行流式传输。