对具有嵌套值/复杂数据类型的 Spark SQL 数据帧进行排序



我的目标是收集嵌套值的有序列表。它应该根据嵌套列表中的元素进行排序。我尝试了不同的方法,但在性能和正确性方面存在担忧。

全球订购

case class Payment(Id: String, Date: String, Paid: Double)
val payments = Seq(
  Payment("mk", "10:00 AM", 8.6D),
  Payment("mk", "06:00 AM", 12.6D),
  Payment("yc", "07:00 AM", 16.6D),
  Payment("yc", "09:00 AM", 2.6D),
  Payment("mk", "11:00 AM", 5.6D)
)
val df = spark.createDataFrame(payments)
// order globally
df.orderBy(col("Paid").desc)
  .groupBy(col("Id"))
  .agg(
    collect_list(struct(col("Date"), col("Paid"))).as("UserPayments")
  )
  .withColumn("LargestPayment", col("UserPayments")(0).getField("Paid"))
  .withColumn("LargestPaymentDate", col("UserPayments")(0).getField("Date"))
  .show(false)
+---+-------------------------------------------------+--------------+------------------+
|Id |UserPayments                                     |LargestPayment|LargestPaymentDate|
+---+-------------------------------------------------+--------------+------------------+
|yc |[[07:00 AM,16.6], [09:00 AM,2.6]]                |16.6          |07:00 AM          |
|mk |[[06:00 AM,12.6], [10:00 AM,8.6], [11:00 AM,5.6]]|12.6          |06:00 AM          |
+---+-------------------------------------------------+--------------+------------------+

这是一种幼稚而直接的方法,但我对正确性感到担忧。列表真的是全局排序还是仅在分区内排序?

窗口功能

// use Window
val window = Window.partitionBy(col("Id")).orderBy(col("Paid").desc)
df.withColumn("rank", rank().over(window))
  .groupBy(col("Id"))
  .agg(
    collect_list(struct(col("rank"), col("Date"), col("Paid"))).as("UserPayments")
  )
  .withColumn("LargestPayment", col("UserPayments")(0).getField("Paid"))
  .withColumn("LargestPaymentDate", col("UserPayments")(0).getField("Date"))
  .show(false)
+---+-------------------------------------------------------+--------------+------------------+
|Id |UserPayments                                           |LargestPayment|LargestPaymentDate|
+---+-------------------------------------------------------+--------------+------------------+
|yc |[[1,07:00 AM,16.6], [2,09:00 AM,2.6]]                  |16.6          |07:00 AM          |
|mk |[[1,06:00 AM,12.6], [2,10:00 AM,8.6], [3,11:00 AM,5.6]]|12.6          |06:00 AM          |
+---+-------------------------------------------------------+--------------+------------------+

这应该有效还是我错过了什么?

即时在 UDF 中订购

// order in UDF
val largestPaymentDate = udf((lr: Seq[Row]) => {
  lr.max(Ordering.by((l: Row) => l.getAs[Double]("Paid"))).getAs[String]("Date")
})
df.groupBy(col("Id"))
  .agg(
    collect_list(struct(col("Date"), col("Paid"))).as("UserPayments")
  )
  .withColumn("LargestPaymentDate", largestPaymentDate(col("UserPayments")))
  .show(false)
+---+-------------------------------------------------+------------------+
|Id |UserPayments                                     |LargestPaymentDate|
+---+-------------------------------------------------+------------------+
|yc |[[07:00 AM,16.6], [09:00 AM,2.6]]                |07:00 AM          |
|mk |[[10:00 AM,8.6], [06:00 AM,12.6], [11:00 AM,5.6]]|06:00 AM          |
+---+-------------------------------------------------+------------------+

就正确性而言,我想这里没有什么可抱怨的。但是对于以下操作,我希望列表是有序的,而不必每次都显式执行。

我试图编写一个UDF,它将列表作为输入并返回有序列表 - 但是返回列表太痛苦了,我放弃了它。

我会颠倒struct的顺序并用max聚合:

val result = df
  .groupBy(col("Id"))
  .agg(
    collect_list(struct(col("Date"), col("Paid"))) as "UserPayments",
    max(struct(col("Paid"), col("Date"))) as "MaxPayment"
  )
result.show
// +---+--------------------+---------------+ 
// | Id|        UserPayments|     MaxPayment|
// +---+--------------------+---------------+
// | yc|[[07:00 AM,16.6],...|[16.6,07:00 AM]|
// | mk|[[10:00 AM,8.6], ...|[12.6,06:00 AM]|
// +---+--------------------+---------------+

您可以稍后展平struct

result.select($"id", $"UserPayments", $"MaxPayment.*").show
// +---+--------------------+----+--------+
// | id|        UserPayments|Paid|    Date|
// +---+--------------------+----+--------+
// | yc|[[07:00 AM,16.6],...|16.6|07:00 AM|
// | mk|[[10:00 AM,8.6], ...|12.6|06:00 AM|
// +---+--------------------+----+--------+

sort_array重新排序的结构相同

df
  .groupBy(col("Id"))
  .agg(
    sort_array(collect_list(struct(col("Paid"), col("Date")))) as "UserPayments"
  )
  .show(false)
// +---+-------------------------------------------------+
// |Id |UserPayments                                     |
// +---+-------------------------------------------------+
// |yc |[[2.6,09:00 AM], [16.6,07:00 AM]]                |
// |mk |[[5.6,11:00 AM], [8.6,10:00 AM], [12.6,06:00 AM]]|
// +---+-------------------------------------------------+

最后:

这是一种幼稚而直接的方法,但我对正确性感到担忧。列表真的是全局排序还是仅在分区内排序?

数据将在全球范围内排序,但订单将被groupBy销毁,因此这不是解决方案,只能偶然工作。

repartition(按id(和sortWithinPartitions(按idPaid(应该是可靠的替代品。

最新更新