我想通过解释下面的场景来解释我的问题陈述。
场景:我正在使用flink的process_continuous模式使用flink+java8进行连续文件读取。
这实际上是一种批量读取功能,其中不同的文件将在一天中的不同时间接收。假设file_1.csv在下午三点到达,那么flink作业就会读取这个文件。同样,file-2.csv在下午3:30到达,然后flink job也会读取这个文件,进程将继续以这种方式工作,直到job停止。我们将这些数据发送给Kafka。
问题:当我重新启动flink作业时,它开始读取所有早期读取文件的数据。这意味着当我重新启动工作时,我一次又一次地得到相同的记录。
是否有办法防止数据重复?
听起来您在重新启动时丢弃了作业的状态。如果您通过从检查点或保存点重新启动来执行有状态重启,那么新作业应该从前一个作业停止的位置开始。
详情见https://ci.apache.org/projects/flink/flink-docs-stable/docs/try-flink/flink-operations-playground/#upgrading--rescaling-a-job