SparkSQL - join&groupBy 后获得重复行



我有 2 个数据帧,列如下所示。
注意:列uid不是唯一键,数据帧中存在具有相同 uid 的重复行。

val df1 = spark.read.parquet(args(0)).drop("sv")
val df2 = spark.read.parquet(args(1))
scala> df1.orderBy("uid").show
+----+----+---+
| uid| hid| sv|
+----+----+---+
|uid1|hid2| 10|
|uid1|hid1| 10|
|uid1|hid3| 10|
|uid2|hid1|  2|
|uid3|hid2| 10|
|uid4|hid2|  3|
|uid5|hid3|  5|
+----+----+---+
scala> df2.orderBy("uid").show
+----+----+---+
| uid| pid| sv|
+----+----+---+
|uid1|pid2|  2|
|uid1|pid1|  1|
|uid2|pid1|  2|
|uid3|pid1|  3|
|uid3|pidx|999|
|uid3|pid2|  4|
|uidx|pid1|  2|
+----+----+---+
scala> df1.drop("sv")
  .join(df2, "uid")
  .groupBy("hid", "pid")
  .agg(count("*") as "xcnt", sum("sv") as "xsum", avg("sv") as "xavg")
  .orderBy("hid").show
+----+----+----+----+-----+
| hid| pid|xcnt|xsum| xavg|
+----+----+----+----+-----+
|hid1|pid1|   2|   3|  1.5|
|hid1|pid2|   1|   2|  2.0|
|hid2|pid2|   2|   6|  3.0|
|hid2|pidx|   1| 999|999.0|
|hid2|pid1|   2|   4|  2.0|
|hid3|pid1|   1|   1|  1.0|
|hid3|pid2|   1|   2|  2.0|
+----+----+----+----+-----+

在此演示案例中,一切看起来都不错。

但是当我对生产大数据应用相同的操作时,最终输出包含许多重复的行(同一(hid, pid)对(。
我虽然groupBy运算符会像select distinct hid, pid from ...,但显然不是。

那么我的操作出了什么问题呢?我应该按hid, pid重新分区数据帧吗?
谢谢!

--更新
如果我在联接数据帧后添加.drop("uid"),则最终输出中会缺少一些行。

scala> df1.drop("sv")
  .join(df2, "uid").drop("uid")
  .groupBy("hid", "pid")
  .agg(count("*") as "xcnt", sum("sv") as "xsum", avg("sv") as "xavg")
  .orderBy("hid").show

老实说,我认为数据有问题,而不是代码。当然,如果pidhid确实不同,则不应该有任何重复(我之前在数据中看到过一些流氓西里尔符号(。

要调试此问题,您可以尝试查看"uid"和sv值的组合表示每个重复行。

df1.drop( "sv" )
  .join(df2, "uid")
  .groupBy( "hid", "pid" )
  .agg( collect_list( "uid" ), collect_list( "sv" ) )
  .orderBy( "hid" )
  .show

之后,您将有一些起点来评估数据。或者,如果uid(和"sv"(列表相同,请提交错误。

我想我可能已经找到了根本原因。

也许这是由 AWS S3 一致性模型引起的。

背景是,我提交了 2 个

Spark 作业来创建 2 个表,并提交了第三个任务来连接这两个表(我拆分它们以防它们中的任何一个失败并且我不需要重新运行它们(。
我将这 3 spark-submit放在按顺序运行的 shell 脚本中,并得到了重复行的结果。
当我刚才重新运行上一个作业时,结果似乎不错。

相关内容

  • 没有找到相关文章

最新更新