我试图在输入中没有文件的过去10天内从S3中获取最新文件。问题是路径包含日期。
我的道路就是这样:
val path = "s3://bucket-info/folder1/folder2"
val date = "2019/04/12" ## YYYY/MM/DD
我正在这样做=
val update_path = path+"/" +date //this will become s3://bucket-info/folder1/folder2/2019/04/12
def fileExist(path: String, sc: SparkContext): Boolean = FileSystem.get(getS3OrFileUri(path),
sc.hadoopConfiguration).exists(new Path(path + "/_SUCCESS"))
if (fileExist(update_path, sc)) {
//read and process the file
} else {
log("File not exist")
// I need to get the latest file in the last five days and use. So that I can check "s3://bucket-info/folder1/folder2/2019/04/11" , s3://bucket-info/folder1/folder2/2019/04/10 and others. If no latest file in last 5 days. throw error. s
}
但是我的问题是如何检查月底何时?我可以做循环,但是有没有优化,优雅的方法可以在Spark中进行?
不是很最佳,但是如果您想利用火花,数据框架读取器可以采用多个路径,而input_file_name
为您提供了路径:
val path = "s3://bucket-info/folder1/folder2"
val date = "2019/04/12"
val fmt = DateTimeFormatter.ofPattern("yyyy/MM/dd")
val end = LocalDate.parse(date, fmt)
val prefixes = (0 until 10).map(end.minusDays(_)).map(d => s"$path/${fmt.format(d)}")
val prefix = spark.read
.textFile(prefixes:_*)
.select(input_file_name() as "file")
.distinct()
.orderBy(desc("file"))
.limit(1)
.collect().collectFirst {
case Row(prefix: String) => prefix
}
prefix.fold {
// log error
}
{ path =>
//read and process the file
}
这是非常效率的,没有明确的方法可以使用SPARK,因为使用递归结构S3 Hadoop文件系统实现不是很有效。如果您愿意直接使用S3 API,则可以将s"$path/${fmt.format(end.minusDays(10))}"
设置为参数之后的开始,并使用类似的内容列出键。这可以正常返回按字母顺序排序的密钥列表,并且在日期键中零填充。