从当前日期读取文件减去 Spark 中的 90 天



我正在逐个读取所有文件,这些文件作为YY=18/MM=12/DD=10存储在目录结构中,只需要读取current date minus 60 days。每天都会创建文件,也可能有一天文件不会创建。因此,当天文件夹不会创建。

我正在读取存储在目录结构中的所有文件,如YY/MM/DD。我正在下面编写代码,但它不起作用。

 var datecalculate = {
 var days = 0
 do{
 val start = DateTime.now
 var start1 = DateTime.now.minusDays(days)
 days = days + 1
 var start2 = start1.toString
 datecalculatenow(start2) }
while (days <= 90) 
}    

def datecalculatenow(start2:String):String={
var YY:String = start2.toString.substring(0,4)
var MM:String = start2.toString.substring(5,7)
var DD:String = start2.toString.substring(8,10)
 var datepath = "YYYY=" + YY +"/MM=" +MM +"/DD=" +DD
 var datepath1 = datepath.toString
 org.apache.spark.sql.SparkSession.read.option("delimiter","|").
 option("header","true").option("inferSchema","true").
 csv("/Table/Files" + datepath1  )
 }

我希望从当前日期减去 60 天读取每个文件,其目录结构为 YY/MM/DD

使用 spark sql,您可以在 select 语句中使用以下语句来减去 90 天;

date_sub(CAST(current_timestamp() as DATE), 90)

既然可以从路径列表生成数据帧,为什么不先生成路径列表。以下是从多个路径读取数据的简单简洁的方法:

val paths = (0 until 90).map(days => {
  val tmpDate = DateTime.now.minusDays(days).toString()
  val year = tmpDate.substring(0,4)
  val month = tmpDate.substring(5,7)
  val opdate = tmpDate.toString.substring(8,10)
  (s"basepath/YY=$year/MM=$month/DD=$opdate")
}).toList
val df = spark.read.
        option("delimiter", "|").
        option("header", "true").
        option("inferSchema","true")
        .csv(paths:_*)

生成 paths 时,可以过滤掉不存在的路径。我已经使用了一些经过修改的代码。我没有在本地设置中测试过,但想法是一样的。希望它能帮助你。

最新更新