我正在从目录中读取csv文件并进行一些处理。 现在 flink 只是选择该目录中的任何新文件并对其进行处理。这对我来说很好用。
我陷入了两个问题:
- 我想记录 flink 已完成处理的文件名。
- 我想在 flink 完成处理后立即将处理后的文件移动到其他文件夹。
我的代码片段是:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
org.apache.flink.core.fs.Path filePath = new org.apache.flink.core.fs.Path(feedFileFolderPath);
RowCsvInputFormat format = new RowCsvInputFormat(filePath, FetchTypeInformation.getTypeInformation());
DataStream<Row> inputStream = env.readFile(format, feedFileFolderPath, FileProcessingMode.PROCESS_CONTINUOUSLY,
parseInt(folderLookupTime));
这个话题在 flink 邮件列表中出现了几次——请参阅此处和此处的讨论——但简短的总结是,在 Flink 中还没有一种简单的方法来做到这一点。
似乎通常的做法是使用 cron 作业定期将旧文件移出被监视的目录,并假设它们已被处理。如果你想比这更小心,那么你必须实现自己的机制来跟踪处理工作的进度。上面提到的电子邮件线程包括一些关于如何做到这一点的想法。