大家好,
我有一个Flink作业,它有一个S3文件夹作为源,我们不断地将数千个小的(每个大约1KB(gzip文件放入该文件夹中,速度大约为每分钟5000个文件。以下是我如何在Scala:中创建源代码
val my_input_format = new TextInputFormat(
new org.apache.flink.core.fs.Path(my_path))
my_input_format.setFilesFilter(FilePathFilter.createDefaultFilter())
my_input_format.setNestedFileEnumeration(true)
val my_raw_stream = streamEnv
.readFile(my_input_format,
my_path,
FileProcessingMode.PROCESS_CONTINUOUSLY,
1000)
问题是,在上述1000ms的监控间隔下,大约20%的文件被遗漏。在Apache Flink Dashboard中,在随后的操作员中,我只能看到记录的文件总数的约80%("发送的记录"列(。
如果我增加监视间隔,丢失的文件数量就会减少。在5000毫秒时,这一比例约为10%,在30000毫秒时,只有约2%未命中。
但未记录警告/错误。
我无法在HDFS中模拟这一点,因为在我们的集群中我无法达到那么高的文件写入速度。
有人能帮忙吗。非常感谢。
AmazonS3为列出目录提供了最终的一致性(请参阅此问题(。
监控源列出目录中的文件,并通过记住其最大修改时间戳来跟踪处理的文件。由于S3列表不能保证立即一致,因此最大修改时间戳可能会提前,时间戳较小的文件可能会丢失。
我认为这个问题不能通过增加监测间隔来完全解决。相反,我们需要一个额外的参数,为最大时间戳添加一个偏移量。如果你能通过邮件列表或打开Jira门票联系Flink社区,那就太好了。
====================更新=============
我已经根据法比安的建议实施了这一改变。就功能而言,它已经完成并运行。需要花费更多的时间来编写正确的单元测试/文档。我的实现在这里