Pyspark如何在没有唯一/主要标识符的情况下比较两个数据帧



我正在尝试比较从hive和snowflake派生的两个数据集。这些数据集没有定义唯一标识符或主键。因此,这里的挑战是根据相应的雪花行来验证来自hive的公司的交易。

hive_df
+---+-----+-----+----+------+------+
|trans_id| comp_name  |dept |  date|
+--------+-----+------+-----+------+
|1101    | Johnaon&jho| acnt|2-3-21|
|1101    | Johnaon&jho| acnt|2-3-21|
|1102    | jeo_cop    | sales|3-2-21
|1103    | star       | acnt| 4-2-21
+---+-----+-----+------+-----------+
snowflake_df
+---+-----+-----+----+------+------+
|trans_id| comp_name  |dept |  date|
+--------+-----+------+-----+------+
|1101    | Johnaon&jho| acnt|2-3-21|
|1101    | Johnaon&jho| acnt|2-3-21|
|1102    | jeo_cop    | sales|3-2-21
|1103    | star       | acnt| 4-2-21|
+---+-----+-----+------+-----------+

我看到了许多使用唯一id连接两个数据帧并使用pyspark(如(比较差异的例子

joined_df=source_df
.join(dest_df, 'key', 'full')
report = joined_df
.filter((source_df[column] != dest_df[column]))

但是,当数据集没有唯一标识符时,解决方案是什么?以及如何进行实际比较?任何解决方案请

我多年来一直使用以下方法
联合所有,结合分组和条件计数,可以全面了解数据并优雅地处理重复。

生成样本数据帧

cols = ["trans_id","comp_name","dept","date"]
hive_data = 
[("1101","Johnaon&jho","acnt","2-3-21")
,("1101","Johnaon&jho","acnt","2-3-21")
,("1102","jeo_cop","sales","3-2-21")]
snowflake_data = 
[("1101","Johnaon&jho","acnt","2-3-21")
,("1101","Johnaon&jho","acnt","2-3-21")
,("1102","jeo_cop","sales","3-2-21")]
hive_df = spark.createDataFrame(hive_data, cols)
snowflake_df = spark.createDataFrame(snowflake_data, cols)

hive_df.show()
+--------+-----------+-----+------+
|trans_id|  comp_name| dept|  date|
+--------+-----------+-----+------+
|    1101|Johnaon&jho| acnt|2-3-21|
|    1101|Johnaon&jho| acnt|2-3-21|
|    1102|    jeo_cop|sales|3-2-21|
+--------+-----------+-----+------+
snowflake_df.show()
+--------+-----------+-----+------+
|trans_id|  comp_name| dept|  date|
+--------+-----------+-----+------+
|    1101|Johnaon&jho| acnt|2-3-21|
|    1101|Johnaon&jho| acnt|2-3-21|
|    1102|    jeo_cop|sales|3-2-21|
+--------+-----------+-----+------+

基于df_unionall&groupBy

import pyspark.sql.functions as F
hive_df_1 = hive_df.withColumn('source_system',F.lit('hive'))
snowflake_df_1 = snowflake_df.withColumn('source_system',F.lit('snow_flake'))
df_unionall = (hive_df_1
.unionAll(snowflake_df_1)
.groupBy(hive_df.columns)
.agg(
F.count("*").alias('total'),
F.count(F.when(F.col('source_system') == "hive",1)).alias('hive'),
F.count(F.when(F.col('source_system') == "snow_flake",1)).alias('snowflake')
)
)
df_unionall.show()

+--------+-----------+-----+------+-----+----+---------+
|trans_id|  comp_name| dept|  date|total|hive|snowflake|
+--------+-----------+-----+------+-----+----+---------+
|    1101|Johnaon&jho| acnt|2-3-21|    4|   2|        2|
|    1102|    jeo_cop|sales|3-2-21|    2|   1|        1|
+--------+-----------+-----+------+-----+----+---------+

您可以使用subtract方法。请参阅:此处

df = hive_df.subtract(snowflake_df)

相关内容

  • 没有找到相关文章

最新更新