I have a partitioned data structure on S3 as below which store parquet files in it:
date=100000000000
date=111620200621
date=111620202258
The S3 key will look like s3://bucket-name/master/date={a numeric value}
我正在从SPARK代码中读取数据,如下所示:
Dataset<Row> df = spark.read().parquet("s3://bucket-name/master/");
data.createOrReplaceTempView("master");
//这将导致重复,因为NUM_VALUE可能在每个S3分区``中重复
Spark DF如下所示,具有重复的NUM_value:
NAME date NUM_VALUE
name1 100000000000 1
name2 111620200621 2
name3 111620202258 2
预期的唯一输出:
NAME date NUM_VALUE
name1 100000000000 1
name3 111620202258 2
我正在尝试获得以下独特的最新数据:
Dataset<Row> final = spark.sql("SELECT NAME,date,NUM_VALUE FROM (SELECT rank() OVER (PARTITION BY NAME ORDER BY date DESC) rank, * FROM master) temp WHERE (rank = 1)");
final.show();
但当调用上述查询时,我得到了以下错误:
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 40, date), LongType) AS date#472L
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:292)
at org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:594)
at org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:594)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: java.lang.String is not a valid external type for schema of bigint
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.If_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_0_20$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:289)```
[1]: https://i.stack.imgur.com/8INxj.png
阅读您使用的方式
Dataset<Row> df = spark.read().parquet("s3://bucket-name/master/")
要获取重复项,请使用groupby((和Count((返回每组的行数。(并且gt(1( 如果您想删除重复项,请使用dropDuplicatesval dups = df .groupBy("NAME","date","NUM_VALUE").count.filter(col("count").gt(1));
println(dups.count()+" rows in dups.count")
val uniqueRowsDF = df.dropDuplicates("NAME","date","NUM_VALUE")
println(uniqueRowsDF .count()+" rows after .dropDuplicates")