Apache Flink 来自目录的流文件



我想按照给定的顺序将一堆csv文件加载到Apache Flink中,例如由文件名中的命名方案确定,其中可能包含一些时间戳信息。

在 Apache Spark 中,一旦文件移动到特定目录(例如/data/stage),我就可以将文件流式传输到数据集,原子文件移动

如下所示
Dataset<Row> fileStreamDf = spark.readStream()
.option("header", true)
.schema(schema)
.csv("/data/staging")

然后,我会按给定顺序(例如使用 bash 脚本)逐个移动文件到该暂存目录。

我怎样才能用 Apache Flink 实现同样的事情?

这不是完全相同的用例,但我们必须在流式处理作业中执行类似操作(文件HDF5CSV)。所以我写了一个RichSourceFunction,它知道如何以正确的顺序迭代文件,并将文件路径(这些在 S3 中)作为字符串记录发出。然后,下游FlatMapFunction分析文件并发出实际行。

相关内容

  • 没有找到相关文章

最新更新