我正在尝试比较从hive和snowflake派生的两个数据集。这些数据集没有定义唯一标识符或主键。因此,这里的挑战是根据相应的雪花行来验证来自hive的公司的交易。
hive_df
+---+-----+-----+----+------+------+
|trans_id| comp_name |dept | date|
+--------+-----+------+-----+------+
|1101 | Johnaon&jho| acnt|2-3-21|
|1101 | Johnaon&jho| acnt|2-3-21|
|1102 | jeo_cop | sales|3-2-21
|1103 | star | acnt| 4-2-21
+---+-----+-----+------+-----------+
snowflake_df
+---+-----+-----+----+------+------+
|trans_id| comp_name |dept | date|
+--------+-----+------+-----+------+
|1101 | Johnaon&jho| acnt|2-3-21|
|1101 | Johnaon&jho| acnt|2-3-21|
|1102 | jeo_cop | sales|3-2-21
|1103 | star | acnt| 4-2-21|
+---+-----+-----+------+-----------+
我看到了许多使用唯一id连接两个数据帧并使用pyspark(如(比较差异的例子
joined_df=source_df
.join(dest_df, 'key', 'full')
report = joined_df
.filter((source_df[column] != dest_df[column]))
但是,当数据集没有唯一标识符时,解决方案是什么?以及如何进行实际比较?任何解决方案请
我多年来一直使用以下方法
联合所有,结合分组和条件计数,可以全面了解数据并优雅地处理重复。
生成样本数据帧
cols = ["trans_id","comp_name","dept","date"]
hive_data =
[("1101","Johnaon&jho","acnt","2-3-21")
,("1101","Johnaon&jho","acnt","2-3-21")
,("1102","jeo_cop","sales","3-2-21")]
snowflake_data =
[("1101","Johnaon&jho","acnt","2-3-21")
,("1101","Johnaon&jho","acnt","2-3-21")
,("1102","jeo_cop","sales","3-2-21")]
hive_df = spark.createDataFrame(hive_data, cols)
snowflake_df = spark.createDataFrame(snowflake_data, cols)
hive_df.show()
+--------+-----------+-----+------+
|trans_id| comp_name| dept| date|
+--------+-----------+-----+------+
| 1101|Johnaon&jho| acnt|2-3-21|
| 1101|Johnaon&jho| acnt|2-3-21|
| 1102| jeo_cop|sales|3-2-21|
+--------+-----------+-----+------+
snowflake_df.show()
+--------+-----------+-----+------+
|trans_id| comp_name| dept| date|
+--------+-----------+-----+------+
| 1101|Johnaon&jho| acnt|2-3-21|
| 1101|Johnaon&jho| acnt|2-3-21|
| 1102| jeo_cop|sales|3-2-21|
+--------+-----------+-----+------+
基于df_unionall&groupBy
import pyspark.sql.functions as F
hive_df_1 = hive_df.withColumn('source_system',F.lit('hive'))
snowflake_df_1 = snowflake_df.withColumn('source_system',F.lit('snow_flake'))
df_unionall = (hive_df_1
.unionAll(snowflake_df_1)
.groupBy(hive_df.columns)
.agg(
F.count("*").alias('total'),
F.count(F.when(F.col('source_system') == "hive",1)).alias('hive'),
F.count(F.when(F.col('source_system') == "snow_flake",1)).alias('snowflake')
)
)
df_unionall.show()
+--------+-----------+-----+------+-----+----+---------+
|trans_id| comp_name| dept| date|total|hive|snowflake|
+--------+-----------+-----+------+-----+----+---------+
| 1101|Johnaon&jho| acnt|2-3-21| 4| 2| 2|
| 1102| jeo_cop|sales|3-2-21| 2| 1| 1|
+--------+-----------+-----+------+-----+----+---------+
您可以使用subtract
方法。请参阅:此处
df = hive_df.subtract(snowflake_df)