我已经用Scala编写的ApacheFlink API创建了一个Kafka消费者。每当我从一个话题传递一些信息时,它就会适时地接收它们。但是,当我重新启动使用者时,它不是接收新的或未使用的消息,而是使用发送到该主题的最新消息。
以下是我正在做的:
-
运行生产者:
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic corr2
-
运行消费者:
val properties = new Properties() properties.setProperty("bootstrap.servers", "localhost:9092") properties.setProperty("zookeeper.connect", "localhost:2181") properties.setProperty("group.id", "test") val env = StreamExecutionEnvironment.getExecutionEnvironment val st = env .addSource(new FlinkKafkaConsumer09[String]("corr2", new SimpleStringSchema(), properties)) env.enableCheckpointing(5000) st.print() env.execute()
-
传递一些消息
- 停止消费者
- 再次运行消费者会打印我发送的最后一条消息。我希望它只打印新邮件
您正在运行一个Kafka消费者,检查点间隔为5秒。因此,每隔5秒,Flink就会创建一份操作员状态(偏移量)的副本,用于恢复。
一旦检查点完成,它将让操作员知道检查点已经完成。在该通知中,Kafka消费者将偏移提交给Zookeeper。因此,大约每5秒,我们就会将最后一个检查点的偏移量写入ZK。
当你再次启动Flink作业时,它会在ZK中找到偏移量,然后继续。根据时间的不同,提交到ZK之后收到的所有消息都将再次发送。
您无法避免这种行为,因为.print()
"运算符"不是检查点的一部分。它的意思是作为一个调试实用程序。但是,参与检查点的数据接收器(例如滚动文件接收器)将确保不会向文件系统写入重复项。