我在Azure中有一个blob存储容器,我想将容器中的所有.csv文件加载到单个spark数据框架中。所有文件都有相同的前两列('name', 'time')。我对时间列进行了一些转换,将其转换为datetime字段,还根据文件名创建了一个新的id列,并将其移动为第一列。所有剩余的列都以命名格式组成,但是,有些文件比其他文件具有更多的附加列。例如:
一个文件可以像这样:
LV03最新的pyspark将具有以下功能:
df = df1.unionByName(df2, allowMissingColumns=True)
这应该将不同列的2个DataFrame统一起来。更多细节见API文档
我已经想出了一个解决办法。不使用infer_schema选项,我可以简单地手动设置模式,以便它包含每个文件中的所有列:
schema = StructType()
.add("name",StringType(),True)
.add("time",StringType(),True)
.add("LV01",StringType(),True)
.add("LV02",StringType(),True)
.add("LV03",StringType(),True)
.add("LV04",StringType(),True)
.add("LV05",StringType(),True)
.add("LV06",StringType(),True)
.add("LV07",StringType(),True)
#etc etc
一旦做出了选择,你可以通过模式选项为加载代码:
first_row_is_header = "true"
delimiter = ","
df = spark.read.format(file_type)
.option("header", first_row_is_header)
.option("sep", delimiter)
.schema(schema)
.load(file_location)
对于我的示例来说,这非常有效,尽管对于具有100列的文件,可能会有更有效的方法。不过,这对于我需要做的事情是有效的。