在我的问题中,我需要查询数据库,并将查询结果与Flink中的Kafka数据流连接起来。目前,这是通过将查询结果存储在文件中,然后使用Flink的readFile
功能创建查询结果的DataStream
来完成的。有什么更好的方法可以绕过写入文件的中间步骤,直接从查询结果创建DataStream
?
我目前的理解是,我需要按照这里的建议编写一个自定义的SourceFunction
。这是正确和唯一的方法吗?或者还有其他选择吗?
有什么好的资源可以编写自定义SoruceFunctions
吗?还是我应该看看当前的实现以供参考,并根据我的需求定制它们?
一个简单的解决方案是使用查找联接,也许启用了缓存。
其他可能的解决方案包括kafka-connect,或者使用类似Debezium的东西将数据库表镜像到Flink中。以下是一个示例:https://github.com/ververica/flink-sql-CDC.