在从Kafka接收消息的Flink流应用程序中,1) 如何禁用自动提交?2) 成功处理消息后,如何从Flink手动提交?
谢谢。
默认情况下,Flink在检查点上提交偏移量。您可以按如下方式禁用它:
val consumer = new FlinkKafkaConsumer011[T](...)
c.setCommitOffsetsOnCheckpoints(false)
如果您没有启用检查点,请参阅此处
你为什么要这么做?Flink的检查点机制可以为您解决这个问题。Flink不会在出现故障时提交偏移。如果您在Kafka消费者下游的某个点抛出异常,Flink将尝试从上一个成功的检查点重新启动流。如果错误仍然存在,则Flink将在流失败之前重复重新启动配置的次数。这意味着你不太可能因为Flink提交你的代码没有成功处理的消息的偏移量而丢失消息。