从具有不等列的分区的S3读取数据



我在S3中有一些分区数据,每个分区都有不同数量的列,如下所示。当我读取pyspark和tru-to-print模式中的数据时,我只能读取通常存在于所有分区中但不是所有分区中的列。读取所有列和重命名少数列的最佳方法是什么。

aws s3 ls s3://my-bkt/test_data/
PRE occ_dt=20210426/
PRE occ_dt=20210428/
PRE occ_dt=20210429/
PRE occ_dt=20210430/
PRE occ_dt=20210503/
PRE occ_dt=20210504/

spark.read.parquet("aws s3 ls s3://my-bkt/test_data/").printSchema()
|-- map_api__450jshb457: string (nullable = true)
|-- customer_id: string (nullable = true)
|-- first_name: string (nullable = true)
|-- map_api_592yd749dn: string (nullable = true)
|-- last_name: string (nullable = true)
|-- map_api_has_join: string (nullable = true)

# When I read partition 20210504
spark.read.parquet("aws s3 ls s3://my-bkt/test_data/occ_dt=20210504/").printSchema()
|-- map_api__450jshb457: string (nullable = true)
|-- customer_id: string (nullable = true)
|-- first_name: string (nullable = true)
|-- map_api_592yd749dn: string (nullable = true)
|-- last_name: string (nullable = true)
|-- map_api_has_join: string (nullable = true)
|-- cust_activity: string (nullable = true)
|-- map_api__592rtddvid: string (nullable = true)

# When I read partition 20210503
spark.read.parquet("aws s3 ls s3://my-bkt/test_data/occ_dt=20210503/").printSchema()
|-- map_api__450jshb457: string (nullable = true)
|-- customer_id: string (nullable = true)
|-- first_name: string (nullable = true)
|-- map_api_592yd749dn: string (nullable = true)
|-- last_name: string (nullable = true)
|-- map_api_4js3nnju8572d93: string (nullable = true)
|-- map_api_58943h64u47v: string (nullable = true)
|-- map_api__58943h6220dh: string (nullable = true)

如上所示,在分区20210503&20210504比其他分区。当我读取s3 bucket以获取模式时,只显示所有分区中常见的字段。我希望在读取s3loc时返回所有字段,得到如下预期结果。

Expected Output : 
spark.read.parquet("aws s3 ls s3://my-bkt/test_data/").printSchema()
|-- map_api__450jshb457: string (nullable = true)
|-- customer_id: string (nullable = true)
|-- first_name: string (nullable = true)
|-- map_api_592yd749dn: string (nullable = true)
|-- last_name: string (nullable = true)
|-- map_api_has_join: string (nullable = true)
|-- map_api_4js3nnju8572d93: string (nullable = true)
|-- map_api_58943h64u47v: string (nullable = true)
|-- map_api__58943h6220dh: string (nullable = true)
|-- cust_activity: string (nullable = true)
|-- map_api__592rtddvid: string (nullable = true)

提前感谢!!

在选项中添加了mergeSchema。

spark.read.option("mergeSchema", "true").parquet("aws s3 ls s3://my-bkt/test_data/").printSchema()
|-- map_api__450jshb457: string (nullable = true)
|-- customer_id: string (nullable = true)
|-- first_name: string (nullable = true)
|-- map_api_592yd749dn: string (nullable = true)
|-- last_name: string (nullable = true)
|-- map_api_has_join: string (nullable = true)
|-- map_api_4js3nnju8572d93: string (nullable = true)
|-- map_api_58943h64u47v: string (nullable = true)
|-- map_api__58943h6220dh: string (nullable = true)
|-- cust_activity: string (nullable = true)
|-- map_api__592rtddvid: string (nullable = true)

相关内容

  • 没有找到相关文章

最新更新