如何读取Flink FlatMapFunction中的文件



我正在构建Flink管道,并基于实时输入数据需要从RichFlatMapFunction中的存档文件中读取记录(例如,每天我都想从前一天和前一周读取文件)。我想知道最好的方法是什么?

我可以直接使用Hadoop api,所以这就是我接下来要尝试的。

就像这样:

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FSDataInputStream;
class LoadHistory(
var basePath: String,
var pathTemplate: String,
) extends RichFlatMapFunction[(TypeAlias.GridId, TypeAlias.Timestamp), ArchiveRecord] {
// see
// https://programmerall.com/article/34422316834/
// https://stackoverflow.com/questions/37085528/hadoop-with-binary-files
// https://data-flair.training/blogs/hdfs-data-read-operation
val fileSystem = FileSystem.get(new conf.Configuration())
def formatPath(pathTemplate: String, gridId: TypeAlias.GridId, archiveDate: TypeAlias.Timestamp): String = ???
override def flatMap(value: (TypeAlias.GridId, TypeAlias.Timestamp), out: Collector[ArchiveRecord]): Unit = {
val pathStr = formatPath(pathTemplate, value._1, value._2)
val path = new Path(pathStr)
if (!fileSystem.exists(path)) {
return
}

val in: FSDataInputStream = fileSystem.open(path)
if (pathStr.endsWith(".protobuf")) {
// TODO read file
} else {
assert(pathStr.endsWith(".lz4"))
// TODO read file
}
}
}

我是Hadoop的新手,所以我想我需要在从云存储读取数据之前配置它(例如用有意义的东西替换new Configuration())。我知道Flink使用Hadoop在内部读取文件,所以我想知道我是否可以访问Flink在运行时使用的配置或配置的HadoopFileSystem对象。

以前我尝试在FlatMapFunction内启动Flink批处理作业(以env.collect结束),但它似乎导致线程锁定(作业2不会启动,直到作业1完成)。

我深入研究了Flink源代码,发现了一种从org.apache.flink.core.fs.Path中获得初始化org.apache.flink.core.fs.FileSystem对象的方法。然后可以用来读取文件:

import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.FSDataInputStream;
class LoadHistory(
var basePath: String,
var pathTemplate: String,
) extends RichFlatMapFunction[(TypeAlias.GridId, TypeAlias.Timestamp), ArchiveRecord] {

val fileSystem = new Path(basePath).getFileSystem()
def formatPath(gridId: TypeAlias.GridId, archiveDate: TypeAlias.Timestamp): String = ???
override def flatMap(value: (TypeAlias.GridId, TypeAlias.Timestamp), out: Collector[ArchiveRecord]): Unit = {
val pathStr = formatPath(value._1, value._2)
val path = new Path(pathStr)
if (!fileSystem.exists(path)) {
return
}

val in: FSDataInputStream = fileSystem.open(path)

if (pathStr.endsWith(".protobuf")) {
// TODO read file
} else {
assert(pathStr.endsWith(".lz4"))
// TODO read file
}
}
}