Spark:如何在过去10天内从S3获取最新文件



我试图在输入中没有文件的过去10天内从S3中获取最新文件。问题是路径包含日期。

我的道路就是这样:

val path = "s3://bucket-info/folder1/folder2"
val date = "2019/04/12"    ## YYYY/MM/DD

我正在这样做=

 val update_path = path+"/" +date //this will become s3://bucket-info/folder1/folder2/2019/04/12 

def fileExist(path: String, sc: SparkContext): Boolean = FileSystem.get(getS3OrFileUri(path),
  sc.hadoopConfiguration).exists(new Path(path + "/_SUCCESS"))

if (fileExist(update_path, sc)) {
    //read and process the file
} else {
       log("File not exist")
       // I need to get the latest file in the last five days and use. So that I can check "s3://bucket-info/folder1/folder2/2019/04/11" , s3://bucket-info/folder1/folder2/2019/04/10 and others. If no latest file in last 5 days. throw error. s
}

但是我的问题是如何检查月底何时?我可以做循环,但是有没有优化,优雅的方法可以在Spark中进行?

不是很最佳,但是如果您想利用火花,数据框架读取器可以采用多个路径,而input_file_name为您提供了路径:

val path = "s3://bucket-info/folder1/folder2"
val date = "2019/04/12"
val fmt = DateTimeFormatter.ofPattern("yyyy/MM/dd")
val end = LocalDate.parse(date, fmt)
val prefixes = (0 until 10).map(end.minusDays(_)).map(d => s"$path/${fmt.format(d)}")
val prefix = spark.read
  .textFile(prefixes:_*)
  .select(input_file_name() as "file")
  .distinct()
  .orderBy(desc("file"))
  .limit(1)
  .collect().collectFirst {
  case Row(prefix: String) => prefix
}
prefix.fold {
  // log error
}
{ path =>
  //read and process the file
}

这是非常效率的,没有明确的方法可以使用SPARK,因为使用递归结构S3 Hadoop文件系统实现不是很有效。如果您愿意直接使用S3 API,则可以将s"$path/${fmt.format(end.minusDays(10))}"设置为参数之后的开始,并使用类似的内容列出键。这可以正常返回按字母顺序排序的密钥列表,并且在日期键中零填充。

最新更新