我有一个Flink 1.11作业,它使用来自Kafka主题的消息,对它们进行键控、过滤(keyBy后面跟着一个自定义ProcessFunction(,并通过JDBC接收器将它们保存到数据库中(如下所述:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/jdbc.html)
Kafka消费者使用以下选项进行初始化:
properties.setProperty("auto.offset.reset", "earliest")
kafkaConsumer = new FlinkKafkaConsumer(topic, deserializer, properties)
kafkaConsumer.setStartFromGroupOffsets()
kafkaConsumer.setCommitOffsetsOnCheckpoints(true)
已在群集上启用检查点。
我想要实现的是保证将所有过滤后的数据保存到数据库中,即使数据库关闭了6个小时,或者在保存到数据库时出现编程错误,并且需要更新、重新部署和重新启动作业。
为了实现这一点,Kafka偏移的任何检查点都应该意味着
- 从Kafka读取的数据处于Flink运算符状态,等待过滤/传递到接收器,并将作为Flink运算符检查点的一部分进行检查点,OR
- 从Kafka读取的数据已经提交到数据库中
在查看JdbcSink的实现时,我发现它并没有真正保留任何将被检查点/恢复的内部状态,相反,它的检查点是对数据库的写入。现在,如果这个写操作在检查点期间失败,并且Kafka偏移确实得到了保存,那么我将处于这样一种情况:;丢失";数据-从Kafka的后续读取将从提交的偏移量恢复,当数据库写入失败时,任何正在运行的数据现在都不再从Kaf卡读取,也不在数据库中。
那么,当一个完整的管道(Kafka->Flink->DB(无法执行时,有没有办法停止推进Kafka偏移量?或者这里的解决方案(在1.13之前的世界中(可能是创建我自己的GenericJdbcSinkFunction实现,它将保持一些ValueState,直到数据库写入成功?
我可以看到三个选项:
- 试用Flink版本的JDBC 1.13连接器。这很有可能会奏效
- 如果这不能立即生效,请检查是否可以将其回退到1.11。不应该有太多的改变
- 通过扩展
TwoPhaseCommitSinkFunction
或使用CheckpointedFunction
和CheckpointListener
实现您自己的SinkFunction
,编写您自己的2阶段提交接收器。基本上,您在成功的检查点之后创建一个新事务,并使用notifyCheckpointCompleted
提交它