我有 2 个数据帧,列如下所示。
注意:列uid
不是唯一键,数据帧中存在具有相同 uid 的重复行。
val df1 = spark.read.parquet(args(0)).drop("sv")
val df2 = spark.read.parquet(args(1))
scala> df1.orderBy("uid").show
+----+----+---+
| uid| hid| sv|
+----+----+---+
|uid1|hid2| 10|
|uid1|hid1| 10|
|uid1|hid3| 10|
|uid2|hid1| 2|
|uid3|hid2| 10|
|uid4|hid2| 3|
|uid5|hid3| 5|
+----+----+---+
scala> df2.orderBy("uid").show
+----+----+---+
| uid| pid| sv|
+----+----+---+
|uid1|pid2| 2|
|uid1|pid1| 1|
|uid2|pid1| 2|
|uid3|pid1| 3|
|uid3|pidx|999|
|uid3|pid2| 4|
|uidx|pid1| 2|
+----+----+---+
scala> df1.drop("sv")
.join(df2, "uid")
.groupBy("hid", "pid")
.agg(count("*") as "xcnt", sum("sv") as "xsum", avg("sv") as "xavg")
.orderBy("hid").show
+----+----+----+----+-----+
| hid| pid|xcnt|xsum| xavg|
+----+----+----+----+-----+
|hid1|pid1| 2| 3| 1.5|
|hid1|pid2| 1| 2| 2.0|
|hid2|pid2| 2| 6| 3.0|
|hid2|pidx| 1| 999|999.0|
|hid2|pid1| 2| 4| 2.0|
|hid3|pid1| 1| 1| 1.0|
|hid3|pid2| 1| 2| 2.0|
+----+----+----+----+-----+
在此演示案例中,一切看起来都不错。
但是当我对生产大数据应用相同的操作时,最终输出包含许多重复的行(同一(hid, pid)
对(。
我虽然groupBy
运算符会像select distinct hid, pid from ...
,但显然不是。
那么我的操作出了什么问题呢?我应该按hid, pid
重新分区数据帧吗?
谢谢!
--更新
如果我在联接数据帧后添加.drop("uid")
,则最终输出中会缺少一些行。
scala> df1.drop("sv")
.join(df2, "uid").drop("uid")
.groupBy("hid", "pid")
.agg(count("*") as "xcnt", sum("sv") as "xsum", avg("sv") as "xavg")
.orderBy("hid").show
老实说,我认为数据有问题,而不是代码。当然,如果pid
和hid
确实不同,则不应该有任何重复(我之前在数据中看到过一些流氓西里尔符号(。
要调试此问题,您可以尝试查看"uid"和sv
值的组合表示每个重复行。
df1.drop( "sv" )
.join(df2, "uid")
.groupBy( "hid", "pid" )
.agg( collect_list( "uid" ), collect_list( "sv" ) )
.orderBy( "hid" )
.show
之后,您将有一些起点来评估数据。或者,如果uid
(和"sv"(列表相同,请提交错误。
我想我可能已经找到了根本原因。
也许这是由 AWS S3 一致性模型引起的。
背景是,我提交了 2 个Spark 作业来创建 2 个表,并提交了第三个任务来连接这两个表(我拆分它们以防它们中的任何一个失败并且我不需要重新运行它们(。
我将这 3 spark-submit
放在按顺序运行的 shell 脚本中,并得到了重复行的结果。
当我刚才重新运行上一个作业时,结果似乎不错。