使用SPARK从S3分区数据中删除基于分区列的重复项


I have a partitioned data structure on S3 as below which store parquet files in it:
date=100000000000
date=111620200621
date=111620202258

The S3 key will look like s3://bucket-name/master/date={a numeric value}

我正在从SPARK代码中读取数据,如下所示:

Dataset<Row> df = spark.read().parquet("s3://bucket-name/master/");data.createOrReplaceTempView("master");//这将导致重复,因为NUM_VALUE可能在每个S3分区``中重复

Spark DF如下所示,具有重复的NUM_value:

NAME    date            NUM_VALUE
name1   100000000000    1
name2   111620200621    2
name3   111620202258    2

预期的唯一输出:

NAME    date            NUM_VALUE
name1   100000000000    1
name3   111620202258    2

我正在尝试获得以下独特的最新数据:

Dataset<Row> final = spark.sql("SELECT NAME,date,NUM_VALUE FROM (SELECT rank() OVER (PARTITION BY NAME ORDER BY date DESC) rank, * FROM master) temp WHERE (rank = 1)");final.show();

但当调用上述查询时,我得到了以下错误:

if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 40, date), LongType) AS date#472L
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:292)
at org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:594)
at org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:594)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: java.lang.String is not a valid external type for schema of bigint
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.If_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_0_20$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:289)```

[1]: https://i.stack.imgur.com/8INxj.png

阅读您使用的方式

Dataset<Row> df = spark.read().parquet("s3://bucket-name/master/")

要获取重复项,请使用groupby((Count((返回每组的行数。(并且gt(1(

val dups = df .groupBy("NAME","date","NUM_VALUE").count.filter(col("count").gt(1));
println(dups.count()+" rows in dups.count")

如果您想删除重复项,请使用dropDuplicates

val uniqueRowsDF = df.dropDuplicates("NAME","date","NUM_VALUE")
println(uniqueRowsDF .count()+" rows after .dropDuplicates")

最新更新