CSV 文件中每个有效列后都有一个 NULL 的列,如何删除这些列并重新加载 CSV



这里有一个csv数据示例:

"ID", "name", "abbreviation", "CreatedTime", "CreatedByAccount", "UpdatedTime", "UpdatedByAccount", "inc_country_id", "time_zone_id"
"1","NULL","UNITED ARAB EMIRATES"",NULL","AE","NULL","2015-07-01 20:41:49","NULL","379","NULL","2016-03-16 07:38:49","NULL","8215","NULL","262","NULL","9","NULL"

当iam尝试使用pyspark创建数据帧时,这会导致列不匹配大约有600多个这样的文件具有以上数据,我需要用正确的列映射读取所有这些文件

>>> df=spark.read.csv("s3://xyz.csv",header=True)
>>> df.show()                                                                   
+---+----+--------------------+-----------+----------------+-----------+-------------------+--------------+------------+
| ID|name|        abbreviation|CreatedTime|CreatedByAccount|UpdatedTime|   UpdatedByAccount|inc_country_id|time_zone_id|
+---+----+--------------------+-----------+----------------+-----------+-------------------+--------------+------------+
|  1|NULL|UNITED ARAB EMIRATES|       NULL|              AE|       NULL|2015-07-01 20:41:49|          NULL|         379|
|  2|NULL|           ARGENTINA|       NULL|              AR|       NULL|2015-07-01 20:41:49|          NULL|         379|

我尝试了一种方法,创建一个自定义模式并读取csv文件,但这必须针对600多个具有不同大小和列的文件

>>> abc=StructType([StructField('ID',StringType(),True),StructField('c1',StringType(),True),StructField('name',StringType(),True),StructField('c2',StringType(),True),StructField('abbreviation',StringType(),True),StructField('c3',StringType(),True),StructField('CreatedTime',StringType(),True),StructField('c4',StringType(),True),StructField('CreatedByAccount',StringType(),True),StructField('c5',StringType(),True),StructField('UpdatedTime',StringType(),True),StructField('c6',StringType(),True),StructField('UpdatedByAccount',StringType(),True),StructField('c7',StringType(),True),StructField('inc_country_id',StringType(),True),StructField('c8',StringType(),True),StructField('time_zone_id',StringType(),True),StructField('c9',StringType(),True)])
>>> df=spark.read.csv("s3://xyz.csv/",schema=abc)
>>> df.show()
+---+----+--------------------+-----------+----------------+-----------+-------------------+--------------+----------------+----+-------------------+----+----------------+----+--------------+----+------------+----+
| ID|  c1|                name|         c2|    abbreviation|         c3|        CreatedTime|            c4|CreatedByAccount|  c5|        UpdatedTime|  c6|UpdatedByAccount|  c7|inc_country_id|  c8|time_zone_id|  c9|
+---+----+--------------------+-----------+----------------+-----------+-------------------+--------------+----------------+----+-------------------+----+----------------+----+--------------+----+------------+----+
|  1|NULL|UNITED ARAB EMIRATES|       NULL|              AE|       NULL|2015-07-01 20:41:49|          NULL|             379|NULL|2016-03-16 07:38:49|NULL|            8215|NULL|           262|NULL|           9|NULL|
|  2|NULL|           ARGENTINA|       NULL|              AR|       NULL|2015-07-01 20:41:49|          NULL|             379|NULL|2015-10-28 21:07:47|NULL|             379|NULL|           187|NULL|        None|NULL|

有没有什么通用的方法可以使用pyspark在没有NULL的情况下重新加载所有这些文件?

我的解决方案是读取文件两次:一次用于获取模式(然后对其进行操作(,另一次用于实际读取

# keep original fields so we can `select` later
df_schema = spark.read.csv('a.csv', header=True)
original_fields = df_schema.schema.fields
# adding extra dummy column after each valid column
expanded_fields = []
for i, field in enumerate(original_fields):
expanded_fields.append(field)
expanded_fields.append(T.StructField(f'col_{i}', T.StringType()))
# build a "fake" schema to fit with csv
schema = T.StructType(expanded_fields)
# using "fake" schema to load CSV, then select only valid columns from original fields
df = spark.read.csv('a.csv', header=True, schema=schema).select([field.name for field in original_fields])
df.show()
# +---+--------------------+------------+-------------------+----------------+-------------------+----------------+--------------+------------+
# | ID|                name|abbreviation|        CreatedTime|CreatedByAccount|        UpdatedTime|UpdatedByAccount|inc_country_id|time_zone_id|
# +---+--------------------+------------+-------------------+----------------+-------------------+----------------+--------------+------------+
# |  1|UNITED ARAB EMIRATES|          AE|2015-07-01 20:41:49|             379|2016-03-16 07:38:49|            8215|           262|           9|
# +---+--------------------+------------+-------------------+----------------+-------------------+----------------+--------------+------------+

相关内容

  • 没有找到相关文章

最新更新