我需要将多个CSV文件组合到一个对象中(我假设一个数据框(,但是它们都有不匹配的列,例如:
CSV A
store_location_key | product_key | collector_key | trans_dt | sales | units | trans_key
CSV B
collector_key | trans_dt | store_location_key | product_key | sales | units | trans_key
CSV C
collector_key | trans_dt | store_location_key |product_key | sales | units | trans_id
最重要的是,我需要与两个具有匹配列的其他CSV文件匹配:
位置CSV
store_location_key | region | province | city | postal_code | banner | store_num
产品CSV
product_key | sku | item_name | item_description | department | category
数据类型都是一致的,即销售列始终是float,store_location_key始终是int等。即使我首先将每个CSV转换为数据框,我不确定join
是否有效(除了除外最后两个(由于列需要匹配的方式。
要合并前三个CSV文件,首先将其分别读取为dataframes,然后使用union
。使用union
很重要时的列和列数,因此首先需要在数据范围内添加所有丢失的列,然后使用select
来确保列以相同的顺序。
all_columns = ['collector_key', 'trans_dt', 'store_location_key', 'product_key', 'sales', 'units', 'trans_key', 'trans_id']
dfA = (spark.read.csv("a.csv", header=True)
.withColumn(trans_id, lit(null))
.select(all_columns))
dfB = (spark.read.csv("b.csv", header=True)
.withColumn(trans_id, lit(null))
.select(all_columns))
dfC = (spark.read.csv("c.csv", header=True)
.withColumn(trans_key, lit(null))
.select(all_columns))
df = dfA.union(dfB).union(dfC)
注意:如果CSV文件的列/列数相同,则可以使用单个spark.read
操作轻松组合它们。
合并前三个CSV后,其余的很容易。位置和产品CSV都可以使用join
与其余的。
df_location = spark.read.csv("location.csv", header=True)
df_product = spark.read.csv("product.csv", header=True)
df2 = df.join(df_location, df.store_location_key == df_location.store_location_key)
df3 = df2.join(df_product, df2.product_key == df_product.product_key)