这里有一个csv数据示例:
"ID", "name", "abbreviation", "CreatedTime", "CreatedByAccount", "UpdatedTime", "UpdatedByAccount", "inc_country_id", "time_zone_id"
"1","NULL","UNITED ARAB EMIRATES"",NULL","AE","NULL","2015-07-01 20:41:49","NULL","379","NULL","2016-03-16 07:38:49","NULL","8215","NULL","262","NULL","9","NULL"
当iam尝试使用pyspark创建数据帧时,这会导致列不匹配大约有600多个这样的文件具有以上数据,我需要用正确的列映射读取所有这些文件
>>> df=spark.read.csv("s3://xyz.csv",header=True)
>>> df.show()
+---+----+--------------------+-----------+----------------+-----------+-------------------+--------------+------------+
| ID|name| abbreviation|CreatedTime|CreatedByAccount|UpdatedTime| UpdatedByAccount|inc_country_id|time_zone_id|
+---+----+--------------------+-----------+----------------+-----------+-------------------+--------------+------------+
| 1|NULL|UNITED ARAB EMIRATES| NULL| AE| NULL|2015-07-01 20:41:49| NULL| 379|
| 2|NULL| ARGENTINA| NULL| AR| NULL|2015-07-01 20:41:49| NULL| 379|
我尝试了一种方法,创建一个自定义模式并读取csv文件,但这必须针对600多个具有不同大小和列的文件
>>> abc=StructType([StructField('ID',StringType(),True),StructField('c1',StringType(),True),StructField('name',StringType(),True),StructField('c2',StringType(),True),StructField('abbreviation',StringType(),True),StructField('c3',StringType(),True),StructField('CreatedTime',StringType(),True),StructField('c4',StringType(),True),StructField('CreatedByAccount',StringType(),True),StructField('c5',StringType(),True),StructField('UpdatedTime',StringType(),True),StructField('c6',StringType(),True),StructField('UpdatedByAccount',StringType(),True),StructField('c7',StringType(),True),StructField('inc_country_id',StringType(),True),StructField('c8',StringType(),True),StructField('time_zone_id',StringType(),True),StructField('c9',StringType(),True)])
>>> df=spark.read.csv("s3://xyz.csv/",schema=abc)
>>> df.show()
+---+----+--------------------+-----------+----------------+-----------+-------------------+--------------+----------------+----+-------------------+----+----------------+----+--------------+----+------------+----+
| ID| c1| name| c2| abbreviation| c3| CreatedTime| c4|CreatedByAccount| c5| UpdatedTime| c6|UpdatedByAccount| c7|inc_country_id| c8|time_zone_id| c9|
+---+----+--------------------+-----------+----------------+-----------+-------------------+--------------+----------------+----+-------------------+----+----------------+----+--------------+----+------------+----+
| 1|NULL|UNITED ARAB EMIRATES| NULL| AE| NULL|2015-07-01 20:41:49| NULL| 379|NULL|2016-03-16 07:38:49|NULL| 8215|NULL| 262|NULL| 9|NULL|
| 2|NULL| ARGENTINA| NULL| AR| NULL|2015-07-01 20:41:49| NULL| 379|NULL|2015-10-28 21:07:47|NULL| 379|NULL| 187|NULL| None|NULL|
有没有什么通用的方法可以使用pyspark在没有NULL的情况下重新加载所有这些文件?
我的解决方案是读取文件两次:一次用于获取模式(然后对其进行操作(,另一次用于实际读取
# keep original fields so we can `select` later
df_schema = spark.read.csv('a.csv', header=True)
original_fields = df_schema.schema.fields
# adding extra dummy column after each valid column
expanded_fields = []
for i, field in enumerate(original_fields):
expanded_fields.append(field)
expanded_fields.append(T.StructField(f'col_{i}', T.StringType()))
# build a "fake" schema to fit with csv
schema = T.StructType(expanded_fields)
# using "fake" schema to load CSV, then select only valid columns from original fields
df = spark.read.csv('a.csv', header=True, schema=schema).select([field.name for field in original_fields])
df.show()
# +---+--------------------+------------+-------------------+----------------+-------------------+----------------+--------------+------------+
# | ID| name|abbreviation| CreatedTime|CreatedByAccount| UpdatedTime|UpdatedByAccount|inc_country_id|time_zone_id|
# +---+--------------------+------------+-------------------+----------------+-------------------+----------------+--------------+------------+
# | 1|UNITED ARAB EMIRATES| AE|2015-07-01 20:41:49| 379|2016-03-16 07:38:49| 8215| 262| 9|
# +---+--------------------+------------+-------------------+----------------+-------------------+----------------+--------------+------------+