我的目标是收集嵌套值的有序列表。它应该根据嵌套列表中的元素进行排序。我尝试了不同的方法,但在性能和正确性方面存在担忧。
全球订购
case class Payment(Id: String, Date: String, Paid: Double)
val payments = Seq(
Payment("mk", "10:00 AM", 8.6D),
Payment("mk", "06:00 AM", 12.6D),
Payment("yc", "07:00 AM", 16.6D),
Payment("yc", "09:00 AM", 2.6D),
Payment("mk", "11:00 AM", 5.6D)
)
val df = spark.createDataFrame(payments)
// order globally
df.orderBy(col("Paid").desc)
.groupBy(col("Id"))
.agg(
collect_list(struct(col("Date"), col("Paid"))).as("UserPayments")
)
.withColumn("LargestPayment", col("UserPayments")(0).getField("Paid"))
.withColumn("LargestPaymentDate", col("UserPayments")(0).getField("Date"))
.show(false)
+---+-------------------------------------------------+--------------+------------------+
|Id |UserPayments |LargestPayment|LargestPaymentDate|
+---+-------------------------------------------------+--------------+------------------+
|yc |[[07:00 AM,16.6], [09:00 AM,2.6]] |16.6 |07:00 AM |
|mk |[[06:00 AM,12.6], [10:00 AM,8.6], [11:00 AM,5.6]]|12.6 |06:00 AM |
+---+-------------------------------------------------+--------------+------------------+
这是一种幼稚而直接的方法,但我对正确性感到担忧。列表真的是全局排序还是仅在分区内排序?
窗口功能
// use Window
val window = Window.partitionBy(col("Id")).orderBy(col("Paid").desc)
df.withColumn("rank", rank().over(window))
.groupBy(col("Id"))
.agg(
collect_list(struct(col("rank"), col("Date"), col("Paid"))).as("UserPayments")
)
.withColumn("LargestPayment", col("UserPayments")(0).getField("Paid"))
.withColumn("LargestPaymentDate", col("UserPayments")(0).getField("Date"))
.show(false)
+---+-------------------------------------------------------+--------------+------------------+
|Id |UserPayments |LargestPayment|LargestPaymentDate|
+---+-------------------------------------------------------+--------------+------------------+
|yc |[[1,07:00 AM,16.6], [2,09:00 AM,2.6]] |16.6 |07:00 AM |
|mk |[[1,06:00 AM,12.6], [2,10:00 AM,8.6], [3,11:00 AM,5.6]]|12.6 |06:00 AM |
+---+-------------------------------------------------------+--------------+------------------+
这应该有效还是我错过了什么?
即时在 UDF 中订购
// order in UDF
val largestPaymentDate = udf((lr: Seq[Row]) => {
lr.max(Ordering.by((l: Row) => l.getAs[Double]("Paid"))).getAs[String]("Date")
})
df.groupBy(col("Id"))
.agg(
collect_list(struct(col("Date"), col("Paid"))).as("UserPayments")
)
.withColumn("LargestPaymentDate", largestPaymentDate(col("UserPayments")))
.show(false)
+---+-------------------------------------------------+------------------+
|Id |UserPayments |LargestPaymentDate|
+---+-------------------------------------------------+------------------+
|yc |[[07:00 AM,16.6], [09:00 AM,2.6]] |07:00 AM |
|mk |[[10:00 AM,8.6], [06:00 AM,12.6], [11:00 AM,5.6]]|06:00 AM |
+---+-------------------------------------------------+------------------+
就正确性而言,我想这里没有什么可抱怨的。但是对于以下操作,我希望列表是有序的,而不必每次都显式执行。
我试图编写一个UDF,它将列表作为输入并返回有序列表 - 但是返回列表太痛苦了,我放弃了它。
我会颠倒struct
的顺序并用max
聚合:
val result = df
.groupBy(col("Id"))
.agg(
collect_list(struct(col("Date"), col("Paid"))) as "UserPayments",
max(struct(col("Paid"), col("Date"))) as "MaxPayment"
)
result.show
// +---+--------------------+---------------+
// | Id| UserPayments| MaxPayment|
// +---+--------------------+---------------+
// | yc|[[07:00 AM,16.6],...|[16.6,07:00 AM]|
// | mk|[[10:00 AM,8.6], ...|[12.6,06:00 AM]|
// +---+--------------------+---------------+
您可以稍后展平struct
:
result.select($"id", $"UserPayments", $"MaxPayment.*").show
// +---+--------------------+----+--------+
// | id| UserPayments|Paid| Date|
// +---+--------------------+----+--------+
// | yc|[[07:00 AM,16.6],...|16.6|07:00 AM|
// | mk|[[10:00 AM,8.6], ...|12.6|06:00 AM|
// +---+--------------------+----+--------+
与sort_array
重新排序的结构相同
df
.groupBy(col("Id"))
.agg(
sort_array(collect_list(struct(col("Paid"), col("Date")))) as "UserPayments"
)
.show(false)
// +---+-------------------------------------------------+
// |Id |UserPayments |
// +---+-------------------------------------------------+
// |yc |[[2.6,09:00 AM], [16.6,07:00 AM]] |
// |mk |[[5.6,11:00 AM], [8.6,10:00 AM], [12.6,06:00 AM]]|
// +---+-------------------------------------------------+
最后:
这是一种幼稚而直接的方法,但我对正确性感到担忧。列表真的是全局排序还是仅在分区内排序?
数据将在全球范围内排序,但订单将被groupBy
销毁,因此这不是解决方案,只能偶然工作。
repartition
(按id
(和sortWithinPartitions
(按id
和Paid
(应该是可靠的替代品。