将多个具有不同列数的csv文件读取到数据库块中的单个spark数据框中



我在Azure中有一个blob存储容器,我想将容器中的所有.csv文件加载到单个spark数据框架中。所有文件都有相同的前两列('name', 'time')。我对时间列进行了一些转换,将其转换为datetime字段,还根据文件名创建了一个新的id列,并将其移动为第一列。所有剩余的列都以命名格式组成,但是,有些文件比其他文件具有更多的附加列。例如:

一个文件可以像这样:

LV0301/01/1900 01:00:0047.9623.1043.00

最新的pyspark将具有以下功能:

df = df1.unionByName(df2, allowMissingColumns=True)

这应该将不同列的2个DataFrame统一起来。更多细节见API文档

我已经想出了一个解决办法。不使用infer_schema选项,我可以简单地手动设置模式,以便它包含每个文件中的所有列:

schema = StructType() 
.add("name",StringType(),True) 
.add("time",StringType(),True) 
.add("LV01",StringType(),True) 
.add("LV02",StringType(),True) 
.add("LV03",StringType(),True) 
.add("LV04",StringType(),True) 
.add("LV05",StringType(),True) 
.add("LV06",StringType(),True) 
.add("LV07",StringType(),True) 
#etc etc

一旦做出了选择,你可以通过模式选项为加载代码:

first_row_is_header = "true"
delimiter = ","
df = spark.read.format(file_type) 
.option("header", first_row_is_header) 
.option("sep", delimiter) 
.schema(schema) 
.load(file_location)

对于我的示例来说,这非常有效,尽管对于具有100列的文件,可能会有更有效的方法。不过,这对于我需要做的事情是有效的。

相关内容

  • 没有找到相关文章

最新更新