我正在逐个读取所有文件,这些文件作为YY=18/MM=12/DD=10
存储在目录结构中,只需要读取current date minus 60 days
。每天都会创建文件,也可能有一天文件不会创建。因此,当天文件夹不会创建。
我正在读取存储在目录结构中的所有文件,如YY/MM/DD
。我正在下面编写代码,但它不起作用。
var datecalculate = {
var days = 0
do{
val start = DateTime.now
var start1 = DateTime.now.minusDays(days)
days = days + 1
var start2 = start1.toString
datecalculatenow(start2) }
while (days <= 90)
}
def datecalculatenow(start2:String):String={
var YY:String = start2.toString.substring(0,4)
var MM:String = start2.toString.substring(5,7)
var DD:String = start2.toString.substring(8,10)
var datepath = "YYYY=" + YY +"/MM=" +MM +"/DD=" +DD
var datepath1 = datepath.toString
org.apache.spark.sql.SparkSession.read.option("delimiter","|").
option("header","true").option("inferSchema","true").
csv("/Table/Files" + datepath1 )
}
我希望从当前日期减去 60 天读取每个文件,其目录结构为 YY/MM/DD
使用 spark sql,您可以在 select 语句中使用以下语句来减去 90 天;
date_sub(CAST(current_timestamp() as DATE), 90)
既然可以从路径列表生成数据帧,为什么不先生成路径列表。以下是从多个路径读取数据的简单简洁的方法:
val paths = (0 until 90).map(days => {
val tmpDate = DateTime.now.minusDays(days).toString()
val year = tmpDate.substring(0,4)
val month = tmpDate.substring(5,7)
val opdate = tmpDate.toString.substring(8,10)
(s"basepath/YY=$year/MM=$month/DD=$opdate")
}).toList
val df = spark.read.
option("delimiter", "|").
option("header", "true").
option("inferSchema","true")
.csv(paths:_*)
生成 paths
时,可以过滤掉不存在的路径。我已经使用了一些经过修改的代码。我没有在本地设置中测试过,但想法是一样的。希望它能帮助你。