将CSV文件与不匹配的列相结合



我需要将多个CSV文件组合到一个对象中(我假设一个数据框(,但是它们都有不匹配的列,例如:

CSV A

store_location_key | product_key | collector_key | trans_dt | sales | units | trans_key

CSV B

collector_key | trans_dt | store_location_key | product_key | sales | units | trans_key

CSV C

collector_key | trans_dt | store_location_key |product_key | sales | units | trans_id

最重要的是,我需要与两个具有匹配列的其他CSV文件匹配:

位置CSV

store_location_key | region | province | city | postal_code | banner | store_num

产品CSV

product_key | sku | item_name | item_description | department | category

数据类型都是一致的,即销售列始终是float,store_location_key始终是int等。即使我首先将每个CSV转换为数据框,我不确定join是否有效(除了除外最后两个(由于列需要匹配的方式。

要合并前三个CSV文件,首先将其分别读取为dataframes,然后使用union。使用union很重要时的列和列数,因此首先需要在数据范围内添加所有丢失的列,然后使用select来确保列以相同的顺序。

all_columns = ['collector_key', 'trans_dt', 'store_location_key', 'product_key', 'sales', 'units', 'trans_key', 'trans_id']
dfA = (spark.read.csv("a.csv", header=True)
  .withColumn(trans_id, lit(null))
  .select(all_columns))
dfB = (spark.read.csv("b.csv", header=True)
  .withColumn(trans_id, lit(null))
  .select(all_columns))
dfC = (spark.read.csv("c.csv", header=True)
  .withColumn(trans_key, lit(null))
  .select(all_columns))
df = dfA.union(dfB).union(dfC)

注意:如果CSV文件的列/列数相同,则可以使用单个spark.read操作轻松组合它们。

合并前三个CSV后,其余的很容易。位置和产品CSV都可以使用join与其余的。

df_location = spark.read.csv("location.csv", header=True)
df_product = spark.read.csv("product.csv", header=True)
df2 = df.join(df_location, df.store_location_key == df_location.store_location_key)
df3 = df2.join(df_product, df2.product_key == df_product.product_key)

最新更新