如何在 apache flink 流中从关系数据库中读取数据



如何使用自定义数据源从关系数据库中读取数据。我是眨眼流媒体的新手。我在添加新的自定义数据源时遇到问题。因此,请帮助我添加自定义数据源并从源数据库连续读取数据

正如成志所建议的那样,关系数据库不是为以流方式处理而设计的,最好使用 Kafka、Kinesis 或其他系统。

但是,您可以编写一个自定义源函数,该函数使用 JDBC 连接来获取数据。它必须不断向数据库查询任何新数据。这里的问题是,您需要一种方法来确定哪些数据已经读取/处理,哪些数据没有读取/处理。从我的头顶上,您可以使用一些事情,例如记住上次处理的主键是什么,并在后续查询中使用它,例如:

SELECT * FROM events WHERE event_id > $last_processed_event_id;

或者,您可以在一些事务中清除events表,例如:

SELECT * FROM unprocessed_events; DELETE FROM unprocessed_events WHERE event_id IN $PROCESSED_EVENT_IDS;

event_id可以是允许您唯一标识记录的任何内容,也可能是某个时间戳或一组字段。

要考虑的另一件事是,如果要提供任何合理的at-least-onceexactly-once保证,则必须手动处理检查点(last_processed_even_id偏移量)。

相关内容

  • 没有找到相关文章

最新更新