Apache Flink-s3文件夹监控-许多文件丢失



大家好,

我有一个Flink作业,它有一个S3文件夹作为源,我们不断地将数千个小的(每个大约1KB(gzip文件放入该文件夹中,速度大约为每分钟5000个文件。以下是我如何在Scala:中创建源代码

val my_input_format = new TextInputFormat(
new org.apache.flink.core.fs.Path(my_path))
my_input_format.setFilesFilter(FilePathFilter.createDefaultFilter())
my_input_format.setNestedFileEnumeration(true)
val my_raw_stream = streamEnv
.readFile(my_input_format,
my_path,
FileProcessingMode.PROCESS_CONTINUOUSLY,
1000)

问题是,在上述1000ms的监控间隔下,大约20%的文件被遗漏。在Apache Flink Dashboard中,在随后的操作员中,我只能看到记录的文件总数的约80%("发送的记录"列(。

如果我增加监视间隔,丢失的文件数量就会减少。在5000毫秒时,这一比例约为10%,在30000毫秒时,只有约2%未命中。

但未记录警告/错误。

我无法在HDFS中模拟这一点,因为在我们的集群中我无法达到那么高的文件写入速度。

有人能帮忙吗。非常感谢。

AmazonS3为列出目录提供了最终的一致性(请参阅此问题(。

监控源列出目录中的文件,并通过记住其最大修改时间戳来跟踪处理的文件。由于S3列表不能保证立即一致,因此最大修改时间戳可能会提前,时间戳较小的文件可能会丢失。

我认为这个问题不能通过增加监测间隔来完全解决。相反,我们需要一个额外的参数,为最大时间戳添加一个偏移量。如果你能通过邮件列表或打开Jira门票联系Flink社区,那就太好了。

====================更新=============

我已经根据法比安的建议实施了这一改变。就功能而言,它已经完成并运行。需要花费更多的时间来编写正确的单元测试/文档。我的实现在这里

相关内容

  • 没有找到相关文章

最新更新