当我重新运行Flink消费者时,Kafka再次消费了最新的消息



我已经用Scala编写的ApacheFlink API创建了一个Kafka消费者。每当我从一个话题传递一些信息时,它就会适时地接收它们。但是,当我重新启动使用者时,它不是接收新的或未使用的消息,而是使用发送到该主题的最新消息。

以下是我正在做的:

  1. 运行生产者:

    $ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic corr2
    
  2. 运行消费者:

    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092")
    properties.setProperty("zookeeper.connect", "localhost:2181")
    properties.setProperty("group.id", "test")
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val st = env
        .addSource(new FlinkKafkaConsumer09[String]("corr2", new SimpleStringSchema(), properties))
    env.enableCheckpointing(5000)
    st.print()
    env.execute()
    
  3. 传递一些消息

  4. 停止消费者
  5. 再次运行消费者会打印我发送的最后一条消息。我希望它只打印新邮件

您正在运行一个Kafka消费者,检查点间隔为5秒。因此,每隔5秒,Flink就会创建一份操作员状态(偏移量)的副本,用于恢复。

一旦检查点完成,它将让操作员知道检查点已经完成。在该通知中,Kafka消费者将偏移提交给Zookeeper。因此,大约每5秒,我们就会将最后一个检查点的偏移量写入ZK。

当你再次启动Flink作业时,它会在ZK中找到偏移量,然后继续。根据时间的不同,提交到ZK之后收到的所有消息都将再次发送。

您无法避免这种行为,因为.print()"运算符"不是检查点的一部分。它的意思是作为一个调试实用程序。但是,参与检查点的数据接收器(例如滚动文件接收器)将确保不会向文件系统写入重复项。

相关内容

  • 没有找到相关文章

最新更新