Flink Streaming JdbcSink异常处理



我正在使用JdbcSink将已处理的事件插入Postgres数据库。

偶尔,我会收到来自源流的坏记录,并且它无法插入数据库(java.sql.BatchUpdateException(,因为它无法满足某些表约束。

很明显,我可以通过Flink过滤器操作符来过滤事件,但过滤器会变成一个复杂的代码来检查每一个可能的失败组合。我不想使用过滤器,而是想捕获JdbcSink引发的BatchUpdateException,记录它并继续处理其他事件。

尝试找到从JdbcSink捕获BatchUpdateException的方法时运气不佳。有人尝试过类似的做法并取得成功吗?

我快速查看了一下代码,没有发现明显的解决方案。您可以扩展JdbcOutputFormat类并覆盖attemptFlush方法,然后克隆JdbcSink类并修改您的版本以使用输出格式类。

最新更新