我想按照给定的顺序将一堆csv文件加载到Apache Flink中,例如由文件名中的命名方案确定,其中可能包含一些时间戳信息。
在 Apache Spark 中,一旦文件移动到特定目录(例如/data/stage),我就可以将文件流式传输到数据集,原子文件移动
如下所示Dataset<Row> fileStreamDf = spark.readStream()
.option("header", true)
.schema(schema)
.csv("/data/staging")
然后,我会按给定顺序(例如使用 bash 脚本)逐个移动文件到该暂存目录。
我怎样才能用 Apache Flink 实现同样的事情?
这不是完全相同的用例,但我们必须在流式处理作业中执行类似操作(文件HDF5
不CSV
)。所以我写了一个RichSourceFunction
,它知道如何以正确的顺序迭代文件,并将文件路径(这些在 S3 中)作为字符串记录发出。然后,下游FlatMapFunction
分析文件并发出实际行。