将Flink流写入关系数据库



我正在处理一个flink项目,该项目将流写入关系数据库。

在当前解决方案中,我们编写了一个自定义的接收器功能,该功能可以打开事务,执行SQL插入语句和关闭事务。它运行良好,直到数据量增加,我们开始遇到连接超时问题。我们尝试了一些连接池配置调整,它没有太大帮助。

我们正在考虑尝试"批处理插入",因此要减少数据库的"写入"数量。我们遇到了几乎可以做我们想要的几个类:jdbcoutputformat,jdbcsinkfunction。使用JDBCoutputFormat,我们可以配置批处理大小。

,如果记录的数量不超过"批处理大小",我们还希望每1分钟强制一次"批处理插入"。您通常如何处理这类问题?我的第一个想法是将JDBCoutputFormat扩展到使用计划的任务每1分钟强制冲洗,但这并不明显如何完成。

我们必须一起写自己的水槽吗?

更新:

JDBCSinkFunction每次弗林克检查点进行齐平并执行批处理。只要您进行检查点,批次就不会比检查点间隔了。

但是,在阅读了此邮件列表线程后,我看到JDBCSinkFunction不完全支持一致输出。

相关内容

  • 没有找到相关文章

最新更新