pyspark连接多个数据帧



我是火花和大数据世界的新手。我用一些气流DAG将MySQL数据库转移到HDFS,现在每个表都是HDFS中的镶木文件,现在我需要通过数据帧将blow查询转换为pyspark。

SELECT PV.id product_id,
ZP.vendor_id vendor_id,
V.title vendor_name,
PV.barcode barcode,
PV.title product_title,
ZP.active product_active,
ZP.price product_price,
ZP.capacity product_capacity,
ZP.stock product_stock,
MC1.title subcat_title,
MC2.title parent_category_title,
ZB.title brand_name
FROM xpediaProductVariationVendorInfo ZP
JOIN ProductVariations PV ON PV.id = ZP.xpediaProductVariation_id
JOIN Vendors V ON ZP.vendor_id = V.id
JOIN VendorTypes vt ON V.vendor_type_id = vt.id
JOIN xpediaProductVariation ZPV ON ZPV.id = PV.id
JOIN MenuCategories MC1 ON PV.menu_category_id = MC1.id
LEFT JOIN MenuCategories MC2 ON MC1.parent_id = MC2.id
LEFT JOIN xpedia_brand ZB ON ZB.id = ZPV.brand_id
WHERE ZP.vendor_id={}
AND V.status not in ('Suspend')
GROUP BY PV.id,
ZP.vendor_id;      

这里有很多逻辑,您的分组不符合所选列。(选择了12列,仅分组了2列(在pyspark中,您将不得不重写以下内容-


import pyspark.sql.functions as F
df_output = (df_xpediaProductVariationVendorInfo.alias("ZP")
.join (df_ProductVariations.alias("PV"), F.col("PV.id") == F.col("ZP.xpediaProductVariation_id"))
.join (df_Vendors.alias("V"), F.col("ZP.vendor_id") == F.col("V.id"))
.join (df_VendorTypes.alias("vt"), F.col("V.vendor_type_id") == F.col("vt.id"))
.join (df_xpediaProductVariation.alias("ZPV"), F.col("ZPV.id") == F.col("PV.id"))
.join (df_MenuCategories.alias("MC1"), F.col("PV.menu_category_id") = F.col("MC1.id"))
.join (df_MenuCategories.alias("MC2"), F.col("MC1.parent_id") = F.col("MC2.id"), "left")
.join (df_xpedia_brand.alias("ZB"), F.col("ZB.id") = F.col("ZPV.brand_id"), "left")
.where ((F.col("ZP.vendor_id") == {}) 
& ~(F.col("V.status").isin('Suspend')))
.select(F.col("PV.id"),
F.col("ZP.vendor_id"),
F.col("V.title"),
F.col("PV.barcode"),
F.col("PV.title"),
F.col("ZP.active"),
F.col("ZP.price"),
F.col("ZP.capacity"),
F.col("ZP.stock"),
F.col("MC1.title"),
F.col("MC2.title"),
F.col("ZB.title"))
)
df_output.show()

相关内容

  • 没有找到相关文章

最新更新